This file is indexed.

/usr/lib/python2.7/dist-packages/cylc/dbstatecheck.py is in python-cylc 7.6.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2017 NIWA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import errno
import os
import sqlite3
import sys
from cylc.rundb import CylcSuiteDAO
from cylc.task_state import (
    TASK_STATUS_WAITING, TASK_STATUS_EXPIRED,
    TASK_STATUS_QUEUED, TASK_STATUS_READY, TASK_STATUS_SUBMITTED,
    TASK_STATUS_SUBMIT_RETRYING, TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED,
    TASK_STATUS_FAILED, TASK_STATUS_RETRYING)


class CylcSuiteDBChecker(object):
    """Object for querying a suite database"""
    STATE_ALIASES = {
        'finish': [TASK_STATUS_FAILED, TASK_STATUS_SUCCEEDED],
        'start': [
            TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, TASK_STATUS_FAILED,
            TASK_STATUS_RETRYING],
        'submit': [
            TASK_STATUS_SUBMITTED, TASK_STATUS_SUBMIT_RETRYING,
            TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED, TASK_STATUS_FAILED,
            TASK_STATUS_RETRYING],
        'fail': [TASK_STATUS_FAILED],
        'succeed': [TASK_STATUS_SUCCEEDED],
    }

    def __init__(self, rund, suite):
        db_path = os.path.join(
            os.path.expanduser(rund), suite, "log",
            CylcSuiteDAO.DB_FILE_BASE_NAME)
        if not os.path.exists(db_path):
            raise OSError(errno.ENOENT, os.strerror(errno.ENOENT), db_path)
        self.conn = sqlite3.connect(db_path, timeout=10.0)

    def display_maps(self, res):
        if not res:
            sys.stderr.write("INFO: No results to display.\n")
        else:
            for row in res:
                sys.stdout.write((", ").join(row).encode("utf-8") + "\n")

    def get_remote_point_format(self):
        """Query a remote suite database for a 'cycle point format' entry"""
        for row in self.conn.execute(
                r"SELECT value FROM " + CylcSuiteDAO.TABLE_SUITE_PARAMS +
                r" WHERE key==?",
                ['cycle_point_format']):
            return row[0]

    def state_lookup(self, state):
        """allows for multiple states to be searched via a status alias"""
        if state in self.STATE_ALIASES:
            return self.STATE_ALIASES[state]
        else:
            return [state]

    def suite_state_query(
            self, task, cycle, status=None, message=None, mask=None):
        """run a query on the suite database"""
        stmt_args = []
        stmt_wheres = []

        if mask is None:
            mask = "name, cycle, status"

        if message:
            target_table = CylcSuiteDAO.TABLE_TASK_OUTPUTS
            mask = "outputs"
        else:
            target_table = CylcSuiteDAO.TABLE_TASK_STATES

        stmt = "select {0} from {1}".format(mask, target_table)
        if task is not None:
            stmt_wheres.append("name==?")
            stmt_args.append(task)
        if cycle is not None:
            stmt_wheres.append("cycle==?")
            stmt_args.append(cycle)

        if status:
            stmt_frags = []
            for state in self.state_lookup(status):
                stmt_args.append(state)
                stmt_frags.append("status==?")
            stmt_wheres.append("(" + (" OR ").join(stmt_frags) + ")")
        if stmt_wheres:
            stmt += " where " + (" AND ").join(stmt_wheres)

        res = []
        for row in self.conn.execute(stmt, stmt_args):
            res.append(list(row))

        return res

    def task_state_getter(self, task, cycle):
        """used to get the state of a particular task at a particular cycle"""
        return self.suite_state_query(task, cycle, mask="status")[0]

    def task_state_met(self, task, cycle, status=None, message=None):
        """used to check if a task is in a particular state"""
        res = self.suite_state_query(task, cycle, status, message)
        if status:
            return bool(res)
        elif message:
            for outputs_str, in res:
                for line in outputs_str.splitlines():
                    if message in line.split("=", 1):
                        return True
            return False

    def validate_mask(self, mask):
        fieldnames = ["name", "status", "cycle"]  # extract from rundb.py?
        for term in mask.split(","):
            if term.strip(" ") not in fieldnames:
                return False
        return True