/usr/lib/python2.7/dist-packages/pbcommand/resolver.py is in python-pbcommand 0.5.3-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 | """Driver for creating a Resolved Tool Contract from a Tool Contract"""
from collections import defaultdict
import logging
import os
import uuid
from pbcommand.models.common import (SymbolTypes, REGISTERED_FILE_TYPES,
ResourceTypes)
from pbcommand.models.tool_contract import (ResolvedToolContract,
ToolContract,
ResolvedToolContractTask,
ResolvedScatteredToolContractTask,
ResolvedGatherToolContractTask,
ToolContractResolvedResource)
log = logging.getLogger(__name__)
class ToolContractError(BaseException):
pass
def __resolve_int_or_symbol(symbol_type, symbol_or_int, max_value):
if isinstance(symbol_or_int, int):
return min(symbol_or_int, max_value)
elif symbol_or_int == symbol_type:
return max_value
else:
raise TypeError("unsupported type for {s} '{t}".format(t=symbol_or_int,
s=symbol_type))
def _resolve_nproc(nproc_int_or_symbol, max_nproc):
return __resolve_int_or_symbol(SymbolTypes.MAX_NPROC, nproc_int_or_symbol, max_nproc)
def _resolve_max_nchunks(nchunks_or_symbol, max_nchunks):
return __resolve_int_or_symbol(SymbolTypes.MAX_NCHUNKS, nchunks_or_symbol, max_nchunks)
def _resolve_options(tool_contract, tool_options):
""" Resolve Task Options from
:type tool_contract: ToolContract
:type tool_options: dict
:rtype: dict
"""
resolved_options = {}
# Get and Validate resolved value.
# TODO. None support should be removed.
for option in tool_contract.task.options:
# This hides whatever underlying JSON grossness remains
value = tool_options.get(option.option_id, option.default)
# Wrap in a try to communicate error messages with reasonable context
try:
# This expects the PacBioOption subclass to implement the
# necessary validating function
validated_option = option.validate_option(value)
resolved_options[option.option_id] = validated_option
except (KeyError, ValueError, IndexError, TypeError) as e:
raise ToolContractError("Incompatible option types for {o}. "
"Supplied {i}. Expected pacbio opt type '{t}' {e}".format(
o=option.option_id,
i=type(value),
t=option.OPTION_TYPE_ID, e=str(e)))
return resolved_options
def _resolve_output_file(registry_d, file_type, output_file_type, root_output_dir):
"""
Resolved the Output File Type
:type file_type: pbcommand.models.FileType
:type output_file_type: pbcommand.models.OutputFileType
:return: Resolved output file name
"""
def _get_fname(base, ext):
idx = base, ext
count = registry_d[idx]
xs = "" if count == 0 else "-" + str(count)
registry_d[idx] += 1
name = "".join([base, xs, ".", ext])
return os.path.join(root_output_dir, name)
# FIXME. THIS NEED TO BE FUNDAMENTALLY FIXED and updated to use the spec
# in the avro schema.
return _get_fname(output_file_type.default_name, file_type.ext)
def _resolve_resource_types(resources, output_dir, root_tmp_dir):
resolved_resources = []
def _add(rt_id, p):
r = ToolContractResolvedResource(rt_id, p)
resolved_resources.append(r)
return r
def _to_p(x):
return os.path.join(root_tmp_dir, x)
def _to_r(prefix, suffix=None):
u = uuid.uuid4()
name = "{x}-{u}".format(u=u, x=prefix)
if suffix is not None:
name += suffix
return _to_p(name)
# The names are not optimal, this would require more config
for resource in resources:
if resource == ResourceTypes.TMP_DIR:
path = _to_r("pb-tmp")
_add(resource, path)
elif resource == ResourceTypes.TMP_FILE:
_add(resource, _to_r("pb-tmp", "-file"))
elif resource == ResourceTypes.LOG_FILE:
u = uuid.uuid4()
name = "{x}-{u}-log".format(u=u, x="pb-tmp")
path = os.path.join(output_dir, name)
_add(resource, path)
else:
raise ValueError("Unsupported Resource Type {x}".format(x=resource))
return resolved_resources
def _resolve_output_files(output_file_types, root_output_dir):
# store the files as {(base, ext): count}
_outs_registry = defaultdict(lambda: 0)
return [_resolve_output_file(_outs_registry, REGISTERED_FILE_TYPES[f.file_type_id], f, root_output_dir) for f in output_file_types]
def _resolve_core(tool_contract, input_files, root_output_dir, max_nproc, tool_options, tmp_dir=None):
""" tool_options are dict{id:value} of values to override defaults """
if len(input_files) != len(tool_contract.task.input_file_types):
_d = dict(i=input_files, t=tool_contract.task.input_file_types)
raise ToolContractError("Incompatible input types. Supplied {i}. Expected file types {t}".format(**_d))
output_files = _resolve_output_files(tool_contract.task.output_file_types, root_output_dir)
resolved_options = _resolve_options(tool_contract, tool_options)
nproc = _resolve_nproc(tool_contract.task.nproc, max_nproc)
resolved_resources = _resolve_resource_types(tool_contract.task.resources, root_output_dir, tmp_dir)
return output_files, resolved_options, nproc, resolved_resources
def resolve_tool_contract(tool_contract, input_files, root_output_dir, root_tmp_dir, max_nproc, tool_options, is_distributable, log_level="INFO"):
"""
Convert a ToolContract into a Resolved Tool Contract.
:param tool_contract: Tool Contract interface
:param input_files: List of input files (must be consistent with the tool contract input file list (types are not enforced)
:param max_nproc: Max number of processors
:param tool_options: dict of overridden options
:type input_files: list[String]
:type max_nproc: int
:type tool_contract: ToolContract
:type tool_options: dict
:rtype: ResolvedToolContract
:return: A Resolved tool contract
"""
output_files, resolved_options, nproc, resources = _resolve_core(tool_contract, input_files, root_output_dir, max_nproc, tool_options, root_tmp_dir)
is_distributed = False
if is_distributable and tool_contract.task.is_distributed:
is_distributed = True
task = ResolvedToolContractTask(tool_contract.task.task_id,
is_distributed,
input_files,
output_files,
resolved_options,
nproc,
resources,
log_level=log_level)
return ResolvedToolContract(task, tool_contract.driver)
def resolve_scatter_tool_contract(tool_contract, input_files, root_output_dir, root_tmp_dir, max_nproc, tool_options, max_nchunks, chunk_keys, is_distributable, log_level="INFO"):
output_files, resolved_options, nproc, resources = _resolve_core(tool_contract, input_files, root_output_dir, max_nproc, tool_options, tmp_dir=root_tmp_dir)
resolved_max_chunks = _resolve_max_nchunks(tool_contract.task.max_nchunks, max_nchunks)
task = ResolvedScatteredToolContractTask(tool_contract.task.task_id,
tool_contract.task.is_distributed and is_distributable,
input_files,
output_files,
resolved_options,
nproc,
resources, resolved_max_chunks, chunk_keys, log_level=log_level)
return ResolvedToolContract(task, tool_contract.driver)
def resolve_gather_tool_contract(tool_contract, input_files, root_output_dir, root_tmp_dir, max_nproc, tool_options, chunk_key, is_distributable, log_level="INFO"):
output_files, resolved_options, nproc, resources = _resolve_core(tool_contract, input_files, root_output_dir, max_nproc, tool_options, tmp_dir=root_tmp_dir)
task = ResolvedGatherToolContractTask(tool_contract.task.task_id,
tool_contract.task.is_distributed and is_distributable,
input_files,
output_files,
resolved_options,
nproc,
resources, chunk_key,
log_level=log_level)
return ResolvedToolContract(task, tool_contract.driver)
|