This file is indexed.

/usr/lib/python2.7/dist-packages/GenomicConsensus/main.py is in python-pbgenomicconsensus 2.1.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
#!/usr/bin/env python
#################################################################################
# Copyright (c) 2011-2013, Pacific Biosciences of California, Inc.
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
#   notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
#   notice, this list of conditions and the following disclaimer in the
#   documentation and/or other materials provided with the distribution.
# * Neither the name of Pacific Biosciences nor the names of its
#   contributors may be used to endorse or promote products derived from
#   this software without specific prior written permission.
#
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
# THIS LICENSE.  THIS SOFTWARE IS PROVIDED BY PACIFIC BIOSCIENCES AND ITS
# CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL PACIFIC BIOSCIENCES OR
# ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#################################################################################

# Author: David Alexander

from __future__ import absolute_import

import argparse, atexit, cProfile, gc, glob, h5py, logging, multiprocessing
import os, pstats, random, shutil, tempfile, time, threading, Queue, traceback
import functools
import re
import sys

import pysam

from pbcommand.utils import setup_log, Constants as LogFormats
from pbcommand.cli import pbparser_runner
from pbcore.io import AlignmentSet, ContigSet

from GenomicConsensus import reference
from GenomicConsensus.options import (options, Constants,
                                      get_parser,
                                      processOptions,
                                      resolveOptions,
                                      consensusCoreVersion,
                                      consensusCore2Version)
from GenomicConsensus.utils import (IncompatibleDataException,
                                    datasetCountExceedsThreshold,
                                    die)

class ToolRunner(object):
    """
    The main driver class for the GenomicConsensus tool.  It is assumed that
    arguments have already been parsed and used to populate the global
    'options' namespace before instantiating this class.
    """
    def __init__(self):
        self._inAlnFile = None
        self._resultsQueue = None
        self._workQueue = None
        self._slaves = None
        self._algorithm = None
        self._algorithmConfiguration = None
        self._aborting = False

    def _makeTemporaryDirectory(self):
        """
        Make a temp dir where we can stash things if necessary.
        """
        options.temporaryDirectory = tempfile.mkdtemp(prefix="GenomicConsensus-", dir="/tmp")
        logging.info("Created temporary directory %s" % (options.temporaryDirectory,) )

    def _algorithmByName(self, name, peekFile):
        if name == "plurality":
            from GenomicConsensus.plurality import plurality
            algo = plurality
        elif name == "quiver":
            from GenomicConsensus.quiver import quiver
            algo = quiver
        elif name == "arrow":
            from GenomicConsensus.arrow import arrow
            algo = arrow
            # All arrow models require PW except P6 and the first S/P1-C1
            if set(peekFile.sequencingChemistry) - set(["P6-C4", "S/P1-C1/beta"]):
                if (not peekFile.hasBaseFeature("Ipd") or
                    not peekFile.hasBaseFeature("PulseWidth")):
                    die("Model requires missing base feature: IPD or PulseWidth")
        elif name == "poa":
            from GenomicConsensus.poa import poa
            algo = poa
        elif name == "best":
            logging.info("Identifying best algorithm based on input data")
            from GenomicConsensus import algorithmSelection
            algoName = algorithmSelection.bestAlgorithm(peekFile.sequencingChemistry)
            return self._algorithmByName(algoName, peekFile)
        else:
            die("Failure: unrecognized algorithm %s" % name)
        isOK, msg = algo.availability
        if not isOK:
            die("Failure: %s" % msg)
        logging.info("Will use {a} algorithm".format(a=name))
        return algo

    def _launchSlaves(self):
        """
        Launch a group of worker processes (self._slaves), the queue
        (self._workQueue) that will be used to send them chunks of
        work, and the queue that will be used to receive back the
        results (self._resultsQueue).

        Additionally, launch the result collector process.
        """
        availableCpus = multiprocessing.cpu_count()
        logging.info("Available CPUs: %d" % (availableCpus,))
        logging.info("Requested workers: %d" % (options.numWorkers,))
        logging.info("Parallel Mode: %s" % ("Threaded" if options.threaded else "Process",))
        if (options.numWorkers > availableCpus):
            logging.warn("More workers requested (%d) than CPUs available (%d);"
                         " may result in suboptimal performance."
                         % (options.numWorkers, availableCpus))
        self._initQueues()

        WorkerType, ResultCollectorType = self._algorithm.slaveFactories(options.threaded)
        self._slaves = []
        for i in xrange(options.numWorkers):
            p = WorkerType(self._workQueue, self._resultsQueue, self._algorithmConfiguration)
            self._slaves.append(p)
            p.start()
        logging.info("Launched compute slaves.")

        rcp = ResultCollectorType(self._resultsQueue, self._algorithm.name, self._algorithmConfiguration)
        rcp.start()
        self._slaves.append(rcp)
        logging.info("Launched collector slave.")

    def _initQueues(self):
        if options.threaded:
            self._workQueue = Queue.Queue(options.queueSize)
            self._resultsQueue = Queue.Queue(options.queueSize)
        else:
            self._workQueue = multiprocessing.Queue(options.queueSize)
            self._resultsQueue = multiprocessing.Queue(options.queueSize)

    def _readAlignmentInput(self):
        """
        Read the AlignmentSet input file and
        store it as self._inAlnFile.
        """
        fname = options.inputFilename
        self._inAlnFile = AlignmentSet(fname)

    def _loadReference(self, alnFile):
        logging.info("Loading reference")
        err = reference.loadFromFile(options.referenceFilename, alnFile)
        if err:
            die("Error loading reference")
        # Grok the referenceWindow spec, if any.
        if options.referenceWindowsAsString is None:
            options.referenceWindows = ()
        elif options.skipUnrecognizedContigs:
            # This is a workaround for smrtpipe scatter/gather.
            options.referenceWindows = []
            for s in options.referenceWindowsAsString.split(","):
                try:
                    win = reference.stringToWindow(s)
                    options.referenceWindows.append(win)
                except:
                    pass
        else:
            options.referenceWindows = map(reference.stringToWindow,
                                           options.referenceWindowsAsString.split(","))
        if options.referenceWindowsFromAlignment:
            options.referenceWindows = alnFile.refWindows

    def _checkFileCompatibility(self, alnFile):
        if not alnFile.isSorted:
            die("Input Alignment file must be sorted.")
        if alnFile.isEmpty:
            die("Input Alignment file must be nonempty.")

    def _shouldDisableChunkCache(self, alnFile):
        #if isinstance(alnFile, CmpH5Reader):
        #if alnFile.isCmpH5:
        #    threshold = options.autoDisableHdf5ChunkCache
        #    return datasetCountExceedsThreshold(alnFile, threshold)
        #else:
        #    return False
        return True

    def _configureAlgorithm(self, options, alnFile):
        assert self._algorithm != None
        try:
            self._algorithmConfiguration = self._algorithm.configure(options, alnFile)
        except IncompatibleDataException as e:
            die("Failure: %s" % e.message)

    def _mainLoop(self):
        # Split up reference genome into chunks and farm out the
        # a chunk as a unit of work.
        logging.debug("Starting main loop.")
        ids = reference.enumerateIds(options.referenceWindows)
        for _id in ids:
            if options.fancyChunking:
                chunks = reference.fancyEnumerateChunks(self._inAlnFile,
                                                        _id,
                                                        options.referenceChunkSize,
                                                        options.minCoverage,
                                                        options.minMapQV,
                                                        options.referenceWindows)
            else:
                chunks = reference.enumerateChunks(_id,
                                                   options.referenceChunkSize,
                                                   options.referenceWindows)
            for chunk in chunks:
                if self._aborting: return
                self._workQueue.put(chunk)

        # Write sentinels ("end-of-work-stream")
        for i in xrange(options.numWorkers):
            self._workQueue.put(None)

    def _printProfiles(self):
        for profile in glob.glob(os.path.join(options.temporaryDirectory, "*")):
            pstats.Stats(profile).sort_stats("time").print_stats(20)

    def _cleanup(self):
        if options.doProfiling:
            logging.info("Removing %s" % options.temporaryDirectory)
            shutil.rmtree(options.temporaryDirectory, ignore_errors=True)

    def _setupEvidenceDumpDirectory(self, directoryName):
        if os.path.exists(directoryName):
            shutil.rmtree(directoryName)
        os.makedirs(directoryName)

    @property
    def aborting(self):
        return self._aborting

    def abortWork(self, why):
        """
        Performs a shutdown of all the slave processes.  Called by the
        monitoring thread when a child process exits with a non-zero,
        or when a keyboard interrupt (Ctrl-C) is given. Not called
        during normal shutdown.
        """
        logging.error(why)
        self._aborting = True
        self._resultsQueue.close()
        self._workQueue.close()

    @property
    def slaves(self):
        return self._slaves

    def main(self):

        # This looks scary but it's not.  Python uses reference
        # counting and has a secondary, optional garbage collector for
        # collecting garbage cycles.  Unfortunately when a cyclic GC
        # happens when a thread is calling cPickle.dumps, the
        # interpreter crashes sometimes.  See Bug 19704.  Since we
        # don't leak garbage cycles, disabling the cyclic GC is
        # essentially harmless.
        gc.disable()

        random.seed(42)

        if options.pdb or options.pdbAtStartup:
            print >>sys.stderr, "Process ID: %d" % os.getpid()
            try:
                import ipdb
            except ImportError:
                die("Debugging options require 'ipdb' package installed.")

            if not options.threaded:
                die("Debugging only works with -T (threaded) mode")

        if options.pdbAtStartup:
            ipdb.set_trace()

        logging.info("h5py version: %s" % h5py.version.version)
        logging.info("hdf5 version: %s" % h5py.version.hdf5_version)
        logging.info("ConsensusCore version: %s" %
                     (consensusCoreVersion() or "ConsensusCore unavailable"))
        logging.info("ConsensusCore2 version: %s" %
                     (consensusCore2Version() or "ConsensusCore2 unavailable"))
        logging.info("Starting.")

        atexit.register(self._cleanup)
        if options.doProfiling:
            self._makeTemporaryDirectory()

        with AlignmentSet(options.inputFilename) as peekFile:
            if options.algorithm == "arrow" and peekFile.isCmpH5:
                die("Arrow does not support CmpH5 files")
            if not peekFile.isCmpH5 and not peekFile.hasPbi:
                die("Genomic Consensus only works with cmp.h5 files and BAM "
                    "files with accompanying .pbi files")
            logging.info("Peeking at file %s" % options.inputFilename)
            logging.info("Input data: numAlnHits=%d" % len(peekFile))
            resolveOptions(peekFile)
            self._loadReference(peekFile)
            self._checkFileCompatibility(peekFile)
            self._algorithm = self._algorithmByName(options.algorithm, peekFile)
            self._configureAlgorithm(options, peekFile)
            options.disableHdf5ChunkCache = True
            #options.disableHdf5ChunkCache = self._shouldDisableChunkCache(peekFile)
            #if options.disableHdf5ChunkCache:
            #    logging.info("Will disable HDF5 chunk cache (large number of datasets)")
            #logging.debug("After peek, # hdf5 objects open: %d" % h5py.h5f.get_obj_count())

        if options.dumpEvidence:
            self._setupEvidenceDumpDirectory(options.evidenceDirectory)

        self._launchSlaves()
        self._readAlignmentInput()

        monitoringThread = threading.Thread(target=monitorSlaves, args=(self,))
        monitoringThread.start()

        try:
            if options.doProfiling:
                cProfile.runctx("self._mainLoop()",
                                globals=globals(),
                                locals=locals(),
                                filename=os.path.join(options.temporaryDirectory,
                                                      "profile-main.out"))

            elif options.pdb:
                with ipdb.launch_ipdb_on_exception():
                    self._mainLoop()

            else:
                self._mainLoop()
        except:
            why = traceback.format_exc()
            self.abortWork(why)

        monitoringThread.join()

        if self._aborting:
            logging.error("Aborting")
            return -1
        else:
            logging.info("Finished.")

        if options.doProfiling:
            self._printProfiles()

        # close h5 file.
        self._inAlnFile.close()
        return 0

def monitorSlaves(driver):
    """
    Promptly aborts if a child is found to have exited with a nonzero
    exit code received; otherwise returns when all processes exit cleanly (0).

    This approach is portable--catching SIGCHLD doesn't work on
    Windows.
    """
    while not driver.aborting:
        all_exited = all(not p.is_alive() for p in driver.slaves)
        nonzero_exits = [p.exitcode for p in driver.slaves if p.exitcode]
        if nonzero_exits:
            exitcode = nonzero_exits[0]
            driver.abortWork("Child process exited with exitcode=%d.  Aborting." % exitcode)
            return exitcode
        elif all_exited:
            return 0
        time.sleep(1)

def args_runner(args):
    options.__dict__.update(args.__dict__)
    processOptions()
    tr = ToolRunner()
    return tr.main()

def resolved_tool_contract_runner(resolved_contract):
    rc = resolved_contract
    alignment_path = rc.task.input_files[0]
    reference_path = rc.task.input_files[1]
    gff_path = rc.task.output_files[0]
    dataset_path = rc.task.output_files[1]
    fasta_path = re.sub(".contigset.xml", ".fasta", dataset_path)
    fastq_path = rc.task.output_files[2]
    args = [
        alignment_path,
        "--verbose",
        "--reference", reference_path,
        "--outputFilename", gff_path,
        "--outputFilename", fasta_path,
        "--outputFilename", fastq_path,
        "--numWorkers", str(rc.task.nproc),
        "--minCoverage", str(rc.task.options[Constants.MIN_COVERAGE_ID]),
        "--minConfidence", str(rc.task.options[Constants.MIN_CONFIDENCE_ID]),
        "--algorithm", rc.task.options[Constants.ALGORITHM_ID],
        "--alignmentSetRefWindows",
    ]
    if rc.task.options[Constants.DIPLOID_MODE_ID]:
        args.append("--diploid")
    args_ = get_parser().arg_parser.parser.parse_args(args)
    rc = args_runner(args_)
    if rc == 0:
        pysam.faidx(fasta_path)
        ds = ContigSet(fasta_path, strict=True)
        ds.write(dataset_path)
    return rc

def main(argv=sys.argv):
    setup_log_ = functools.partial(setup_log,
        str_formatter=LogFormats.LOG_FMT_LVL)
    return pbparser_runner(
        argv=argv[1:],
        parser=get_parser(),
        args_runner_func=args_runner,
        contract_runner_func=resolved_tool_contract_runner,
        alog=logging.getLogger(),
        setup_log_func=setup_log_)

if __name__ == "__main__":
    sys.exit(main(sys.argv))