/usr/lib/python2.7/dist-packages/pbcommand/testkit/core.py is in python-pbcommand 0.2.17-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | import os
import unittest
import logging
import subprocess
from .base_utils import (HAS_PBCORE,
pbcore_skip_msg,
get_temp_file,
get_temp_dir)
from pbcommand.resolver import (resolve_tool_contract,
resolve_gather_tool_contract,
resolve_scatter_tool_contract)
from pbcommand.models import ResolvedToolContract, PipelineChunk
from pbcommand.pb_io import (load_tool_contract_from,
load_resolved_tool_contract_from)
from pbcommand.pb_io.tool_contract_io import write_resolved_tool_contract
log = logging.getLogger(__name__)
class PbTestApp(unittest.TestCase):
"""Generic Harness for running tool contracts end-to-end"""
# if the base command is defined, DRIVER_EMIT and DRIVER_RESOLVE can be
# guessed automatically
DRIVER_BASE = None
# complete Emit a tool contract
DRIVER_EMIT = ""
# Run tool from a resolve tool contract JSON file
DRIVER_RESOLVE = ""
# Requires Pbcore
REQUIRES_PBCORE = False
# input files that will be passed to the resolver
# To get example files use, get_data_file("example.txt")]
INPUT_FILES = []
# Arguments passed to the Resolver
MAX_NPROC = 1
TASK_OPTIONS = {}
# These will be checked against the resolved tool contract values
RESOLVED_TASK_OPTIONS = {}
RESOLVED_NPROC = 1
@classmethod
def setUpClass(cls):
if cls.DRIVER_BASE is not None:
if cls.DRIVER_EMIT == "":
cls.DRIVER_EMIT = cls.DRIVER_BASE + " --emit-tool-contract "
if cls.DRIVER_RESOLVE == "":
cls.DRIVER_RESOLVE = cls.DRIVER_BASE + " --resolved-tool-contract "
def _test_outputs_exists(self, rtc):
""":type rtc: pbcommand.models.ResolvedToolContract"""
log.debug("validating output file existence from {r}".format(r=rtc))
log.debug("Resolved Output files from {t}".format(t=rtc.task.task_id))
log.debug(rtc.task.output_files)
for i, output_file in enumerate(rtc.task.output_files):
msg = "Unable to find {i}-th output file {p}".format(i=i, p=output_file)
self.assertTrue(os.path.exists(output_file), msg)
def _to_rtc(self, tc, output_dir, tmp_dir):
# handled the polymorphism in subclasses by overriding
return resolve_tool_contract(tc, self.INPUT_FILES, output_dir, tmp_dir, self.MAX_NPROC, self.TASK_OPTIONS)
def test_run_e2e(self):
# hack to skip running the base Test class (which is the nose default behavior)
if self.__class__.__name__ in ('PbTestApp', 'PbTestScatterApp', 'PbTestGatherApp'):
return
if self.REQUIRES_PBCORE:
if not HAS_PBCORE:
self.assertTrue(True, pbcore_skip_msg("Skipping running e2e for {d}".format(d=self.DRIVER_EMIT)))
return
output_dir = get_temp_dir(suffix="rtc-test")
tmp_dir = get_temp_dir(suffix="rtc-temp")
log.debug("Driver {e}".format(e=self.DRIVER_EMIT))
log.debug("input files {i}".format(i=self.INPUT_FILES))
log.debug("running in {p}".format(p=output_dir))
output_tc = get_temp_file("-{n}-tool_contract.json".format(n=self.__class__.__name__), output_dir)
emit_tc_exe = "{e} > {o}".format(e=self.DRIVER_EMIT, o=output_tc)
rcode = subprocess.call([emit_tc_exe], shell=True)
self.assertEquals(rcode, 0, "Emitting tool contract failed for '{e}'".format(e=emit_tc_exe))
# sanity marshall-unmashalling
log.debug("Loading tool-contract from {p}".format(p=output_tc))
tc = load_tool_contract_from(output_tc)
log.info("Resolving tool contract to RTC")
rtc = self._to_rtc(tc, output_dir, tmp_dir)
output_json_rtc = get_temp_file("resolved_tool_contract.json", output_dir)
write_resolved_tool_contract(rtc, output_json_rtc)
# sanity
loaded_rtc = load_resolved_tool_contract_from(output_json_rtc)
self.assertIsInstance(loaded_rtc, ResolvedToolContract)
# Test Resolved options if specified.
for opt, resolved_value in self.RESOLVED_TASK_OPTIONS.iteritems():
self.assertTrue(opt in rtc.task.options, "Resolved option {x} not in RTC options.".format(x=opt))
# this needs to support polymorphic equals (i.e., almostEquals
if not isinstance(resolved_value, float):
emsg = "Resolved option {o} are not equal. Expected {a}, got {b}".format(o=opt, b=rtc.task.options[opt], a=resolved_value)
self.assertEquals(rtc.task.options[opt], resolved_value, emsg)
# Resolved NPROC
self.assertEquals(rtc.task.nproc, self.RESOLVED_NPROC)
log.info("running resolved contract {r}".format(r=output_json_rtc))
exe = "{d} {p}".format(p=output_json_rtc, d=self.DRIVER_RESOLVE)
log.info("Running exe '{e}'".format(e=exe))
rcode = subprocess.call([exe], shell=True)
self.assertEqual(rcode, 0, "Running from resolved tool contract failed from {e}".format(e=exe))
log.info("Successfully completed running e2e for {d}".format(d=self.DRIVER_EMIT))
self._test_outputs_exists(rtc)
self.run_after(rtc, output_dir)
def run_after(self, rtc, output_dir):
"""
Optional additional test code, e.g. to verify that the job produced
the expected outputs. This is run automatically by test_run_e2e, but
does nothing unless overridden in a subclass.
"""
pass
class PbTestScatterApp(PbTestApp):
"""Test harness for testing end-to-end scattering apps
Override MAX_NCHUNKS, RESOLVED_MAX_NCHUNKS and CHUNK_KEYS
"""
MAX_NCHUNKS = 12
RESOLVED_MAX_NCHUNKS = 12
CHUNK_KEYS = ()
def _to_rtc(self, tc, output_dir, tmp_dir):
return resolve_scatter_tool_contract(tc, self.INPUT_FILES, output_dir, tmp_dir, self.MAX_NPROC, self.TASK_OPTIONS, self.MAX_NCHUNKS, self.CHUNK_KEYS)
class PbTestGatherApp(PbTestApp):
"""Test harness for testing end-to-end gather apps
Override the CHUNK_KEY to pass that into your resolver
"""
CHUNK_KEY = PipelineChunk.CHUNK_KEY_PREFIX + 'fasta_id'
def _to_rtc(self, tc, output_dir, tmp_dir):
return resolve_gather_tool_contract(tc, self.INPUT_FILES, output_dir, tmp_dir, self.MAX_NPROC, self.TASK_OPTIONS, self.CHUNK_KEY)
|