/usr/lib/python2.7/dist-packages/pbcommand/cli/examples/dev_scatter_fasta_app.py is in python-pbcommand 0.2.17-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | """Example of Generating a Chunk.json file that 'scatters' a pair of fasta files
In the example, the first fasta file is chunked, while the path to the second
fasta file is passed directly.
It generates a fasta_1_id and fasta_2_id chunk keys,
There's a bit of code here that is copied from pbsmrtpipe.tools.chunk_utils.
Martin will eventually refactor this into pbcore.
"""
import os
import logging
import sys
import warnings
import math
import datetime
from pbcommand.cli import pbparser_runner
from pbcommand.models import get_scatter_pbparser, FileTypes, PipelineChunk
from pbcommand.pb_io import write_pipeline_chunks
from pbcommand.utils import setup_log
log = logging.getLogger(__name__)
TOOL_ID = "pbcommand.tasks.dev_scatter_fasta"
__version__ = '0.1.0'
try:
from pbcore.io import FastaWriter, FastaReader
except ImportError:
warnings.warn("Example apps require pbcore. Install from https://github.com/PacificBiosciences/pbcore")
class Constants(object):
NCHUNKS_OPT = "pbcommand.task_options.dev_scatter_fa_nchunks"
FA_CHUNK_KEY = "$chunk.fasta_id"
def __get_nrecords_from_reader(reader):
n = 0
for _ in reader:
n += 1
return n
def write_fasta_records(fastax_writer_klass, records, file_name):
n = 0
with fastax_writer_klass(file_name) as w:
for record in records:
w.writeRecord(record)
n += 1
log.debug("Completed writing {n} fasta records".format(n=n))
def __to_chunked_fastx_files(fastx_reader_klass, fastax_writer_klass, chunk_key, fastx_path, max_total_nchunks, dir_name, base_name, ext):
"""Convert a Fasta/Fasta file to a chunked list of files"""
# grab the number of records so we can chunk it
with fastx_reader_klass(fastx_path) as f:
nrecords = __get_nrecords_from_reader(f)
max_total_nchunks = min(nrecords, max_total_nchunks)
n = int(math.ceil(float(nrecords)) / max_total_nchunks)
nchunks = 0
with fastx_reader_klass(fastx_path) as r:
it = iter(r)
for i in xrange(max_total_nchunks):
records = []
chunk_id = "_".join([base_name, str(nchunks)])
chunk_name = ".".join([chunk_id, ext])
nchunks += 1
fasta_chunk_path = os.path.join(dir_name, chunk_name)
if i != max_total_nchunks:
for _ in xrange(n):
records.append(it.next())
else:
for x in it:
records.append(x)
write_fasta_records(fastax_writer_klass, records, fasta_chunk_path)
total_bases = sum(len(r.sequence) for r in records)
d = dict(total_bases=total_bases, nrecords=len(records))
d[chunk_key] = os.path.abspath(fasta_chunk_path)
c = PipelineChunk(chunk_id, **d)
yield c
def to_chunked_fasta_files(fasta_path, max_total_nchunks, dir_name, chunk_key, base_name, ext):
return __to_chunked_fastx_files(FastaReader, FastaWriter, chunk_key, fasta_path, max_total_nchunks, dir_name, base_name, ext)
def write_chunks_to_json(chunks, chunk_file):
log.debug("Wrote {n} chunks to {f}.".format(n=len(chunks), f=chunk_file))
write_pipeline_chunks(chunks, chunk_file, "Chunks written at {d}".format(d=datetime.datetime.now()))
return 0
def _write_fasta_chunks_to_file(to_chunk_fastx_file_func, chunk_file, fastx_path, max_total_chunks, dir_name, chunk_key, chunk_base_name, chunk_ext):
chunks = list(to_chunk_fastx_file_func(fastx_path, max_total_chunks, dir_name, chunk_key, chunk_base_name, chunk_ext))
write_chunks_to_json(chunks, chunk_file)
return 0
def write_fasta_chunks_to_file(chunk_file, fasta_path, max_total_chunks, dir_name, chunk_key, chunk_base_name, chunk_ext):
return _write_fasta_chunks_to_file(to_chunked_fasta_files, chunk_file, fasta_path, max_total_chunks, dir_name, chunk_key, chunk_base_name, chunk_ext)
def run_main(fasta_file, chunk_output_json, chunk_key, max_nchunks, nchunks=None, chunk_base_name="fasta"):
"""Create a Chunk.json file with nchunks <= max_nchunks
Not clear on the nchunks vs max_nchunks.
"""
output_dir = os.path.dirname(chunk_output_json)
return write_fasta_chunks_to_file(chunk_output_json, fasta_file, max_nchunks, output_dir, chunk_key, chunk_base_name, "fasta")
def get_parser():
driver = "python -m pbcommand.cli.examples.dev_scatter_fasta_app --resolved-tool-contract "
desc = "Scatter a single fasta file to create chunk.json file"
# chunk keys that will be written to the file
chunk_keys = ("$chunk.fasta_id", )
p = get_scatter_pbparser(TOOL_ID, __version__, "Fasta Scatter",
desc, driver, chunk_keys, is_distributed=False)
p.add_input_file_type(FileTypes.FASTA, "fasta_in", "Fasta In", "Fasta file to scatter")
p.add_output_file_type(FileTypes.CHUNK, "cjson", "Chunk JSON", "Scattered/Chunked Fasta Chunk.json", "fasta.chunks.json")
p.add_int("pbcommand.task_options.dev_scatter_fa_nchunks", "nchunks", 10, "Number of chunks",
"Suggested number of chunks. May be overridden by $max_nchunks")
return p
def args_runner(args):
return run_main(args.fasta_in, args.cjson, Constants.FA_CHUNK_KEY, args.nchunks)
def rtc_runner(rtc):
return run_main(rtc.task.input_files[0],
rtc.task.output_files[0],
Constants.FA_CHUNK_KEY,
rtc.task.options[Constants.NCHUNKS_OPT])
def main(argv=sys.argv):
return pbparser_runner(argv[1:],
get_parser(),
args_runner,
rtc_runner,
log,
setup_log)
if __name__ == '__main__':
sys.exit(main())
|