This file is indexed.

/usr/lib/python2.7/dist-packages/pgspecial/main.py is in python-pgspecial 1.7.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import os
import logging
from collections import namedtuple

from . import export
from .help.commands import helpcommands

log = logging.getLogger(__name__)

NO_QUERY = 0
PARSED_QUERY = 1
RAW_QUERY = 2

PAGER_ALWAYS = 2
PAGER_LONG_OUTPUT = 1
PAGER_OFF = 0

PAGER_MSG = {
    PAGER_OFF: "Pager usage is off.",
    PAGER_LONG_OUTPUT: "Pager is used for long output.",
    PAGER_ALWAYS: "Pager is always used."
    }

SpecialCommand = namedtuple('SpecialCommand',
        ['handler', 'syntax', 'description', 'arg_type', 'hidden', 'case_sensitive'])

@export
class CommandNotFound(Exception):
    pass


@export
class PGSpecial(object):

    # Default static commands that don't rely on PGSpecial state are registered
    # via the special_command decorator and stored in default_commands
    default_commands = {}

    def __init__(self):
        self.timing_enabled = True

        self.commands = self.default_commands.copy()
        self.timing_enabled = False
        self.expanded_output = False
        self.auto_expand = False
        self.pager_config = PAGER_ALWAYS
        self.pager = os.environ.get('PAGER', '')

        self.register(self.show_help, '\\?', '\\?', 'Show Commands.',
                      arg_type=PARSED_QUERY)

        self.register(self.toggle_expanded_output, '\\x', '\\x',
                      'Toggle expanded output.', arg_type=PARSED_QUERY)

        self.register(self.call_pset, '\\pset', '\\pset [key] [value]',
                      'A limited version of traditional \pset', arg_type=PARSED_QUERY)

        self.register(self.show_command_help, '\\h', '\\h',
                      'Show SQL syntax and help.', arg_type=PARSED_QUERY)

        self.register(self.toggle_timing, '\\timing', '\\timing',
                      'Toggle timing of commands.', arg_type=NO_QUERY)

        self.register(self.set_pager, '\\pager', '\\pager [command]',
                      'Set PAGER. Pring the query results via PAGER.',
                      arg_type=PARSED_QUERY)

    def register(self, *args, **kwargs):
        register_special_command(*args, command_dict=self.commands, **kwargs)

    def execute(self, cur, sql):
        commands = self.commands
        command, verbose, pattern = parse_special_command(sql)

        if (command not in commands) and (command.lower() not in commands):
            raise CommandNotFound

        try:
            special_cmd = commands[command]
        except KeyError:
            special_cmd = commands[command.lower()]
            if special_cmd.case_sensitive:
                raise CommandNotFound('Command not found: %s' % command)

        if special_cmd.arg_type == NO_QUERY:
            return special_cmd.handler()
        elif special_cmd.arg_type == PARSED_QUERY:
            return special_cmd.handler(cur=cur, pattern=pattern, verbose=verbose)
        elif special_cmd.arg_type == RAW_QUERY:
            return special_cmd.handler(cur=cur, query=sql)

    def show_help(self, pattern, **_):
        if pattern.strip():
            return self.show_command_help(pattern)

        headers = ['Command', 'Description']
        result = []

        for _, value in sorted(self.commands.items()):
            if not value.hidden:
                result.append((value.syntax, value.description))
        return [(None, result, headers, None)]

    def show_command_help_listing(self):
        table = chunks(sorted(helpcommands.keys()), 6)
        return [(None, table, [], None)]

    def show_command_help(self, pattern, **_):
        command = pattern.strip().upper()
        message = ""

        if not command:
            return self.show_command_help_listing()

        if command in helpcommands:
            helpcommand = helpcommands[command]

            if "description" in helpcommand:
                message += helpcommand["description"]
            if "synopsis" in helpcommand:
                message += "\nSyntax:\n"
                message += helpcommand["synopsis"]
        else:
            message = "No help available for \"%s\"" % pattern
            message += "\nTry \h with no arguments to see available help."

        return [(None, None, None, message)]


    def toggle_expanded_output(self, pattern, **_):
        flag = pattern.strip()
        if flag == "auto":
            self.auto_expand = True
            self.expanded_output = False
            return [(None, None, None, u"Expanded display is used automatically.")]
        elif flag == "off":
            self.expanded_output = False
        elif flag == "on":
            self.expanded_output = True
        else:
            self.expanded_output = not (self.expanded_output or self.auto_expand)

        self.auto_expand = self.expanded_output
        message = u"Expanded display is "
        message += u"on." if self.expanded_output else u"off."
        return [(None, None, None, message)]

    def toggle_timing(self):
        self.timing_enabled = not self.timing_enabled
        message = "Timing is "
        message += "on." if self.timing_enabled else "off."
        return [(None, None, None, message)]

    def call_pset(self, pattern, **_):
        pattern = pattern.split(' ', 2)
        val = pattern[1] if len(pattern) > 1 else ''
        key = pattern[0]
        if hasattr(self, 'pset_' + key):
            return getattr(self, 'pset_' + key)(val)
        else:
            return [(None, None, None, "'%s' is currently not supported by pset" % key)]

    def pset_pager(self, value):
        if value == 'always':
            self.pager_config = PAGER_ALWAYS
        elif value == 'off':
            self.pager_config = PAGER_OFF
        elif value == 'on':
            self.pager_config = PAGER_LONG_OUTPUT
        elif self.pager_config == PAGER_LONG_OUTPUT:
            self.pager_config = PAGER_OFF
        else:
            self.pager_config = PAGER_LONG_OUTPUT
        return [(None, None, None, '%s' % PAGER_MSG[self.pager_config])]

    def set_pager(self, pattern, **_):
        if not pattern:
            if not self.pager:
                os.environ.pop('PAGER', None)
                msg = 'Pager reset to system default.'
            else:
                os.environ['PAGER'] = self.pager
                msg = 'Reset pager back to default. Default: %s' % self.pager
        else:
            os.environ['PAGER'] = pattern
            msg = 'PAGER set to %s.' % pattern

        return [(None, None, None, msg)]


@export
def content_exceeds_width(row, width):
    # Account for 3 characters between each column
    separator_space = (len(row)*3)
    # Add 2 columns for a bit of buffer
    line_len = sum([len(x) for x in row]) + separator_space + 2
    return line_len > width


@export
def parse_special_command(sql):
    command, _, arg = sql.partition(' ')
    verbose = '+' in command

    command = command.strip().replace('+', '')
    return (command, verbose, arg.strip())


def special_command(command, syntax, description, arg_type=PARSED_QUERY,
        hidden=False, case_sensitive=True, aliases=()):
    """A decorator used internally for static special commands"""

    def wrapper(wrapped):
        register_special_command(wrapped, command, syntax, description,
                arg_type, hidden, case_sensitive, aliases,
                command_dict=PGSpecial.default_commands)
        return wrapped
    return wrapper


def register_special_command(handler, command, syntax, description,
        arg_type=PARSED_QUERY, hidden=False, case_sensitive=True, aliases=(),
        command_dict=None):

    cmd = command.lower() if not case_sensitive else command
    command_dict[cmd] = SpecialCommand(handler, syntax, description, arg_type,
                                   hidden, case_sensitive)
    for alias in aliases:
        cmd = alias.lower() if not case_sensitive else alias
        command_dict[cmd] = SpecialCommand(handler, syntax, description, arg_type,
                                       case_sensitive=case_sensitive,
                                       hidden=True)
def chunks(l, n):
    n = max(1, n)
    return [l[i:i + n] for i in range(0, len(l), n)]

@special_command('\\e', '\\e [file]', 'Edit the query with external editor.', arg_type=NO_QUERY)
def doc_only():
    raise RuntimeError


@special_command('\\ef', '\\ef [funcname [line]]', 'Edit the contents of the query buffer.', arg_type=NO_QUERY, hidden=True)
@special_command('\\sf', '\\sf[+] FUNCNAME', 'Show a function\'s definition.', arg_type=NO_QUERY, hidden=True)
@special_command('\\do', '\\do[S] [pattern]', 'List operators.', arg_type=NO_QUERY, hidden=True)
@special_command('\\dp', '\\dp [pattern]', 'List table, view, and sequence access privileges.', arg_type=NO_QUERY, hidden=True)
@special_command('\\z', '\\z [pattern]', 'Same as \\dp.', arg_type=NO_QUERY, hidden=True)
def place_holder():
    raise NotImplementedError