This file is indexed.

/usr/lib/python3/dist-packages/londiste/util.py is in python3-londiste 3.3.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""Misc utilities for COPY code.
"""

from __future__ import division, absolute_import, print_function

import skytools
import londiste.handler

__all__ = ['handler_allows_copy', 'find_copy_source']

def handler_allows_copy(table_attrs):
    """Decide if table is copyable based on attrs."""
    if not table_attrs:
        return True
    attrs = skytools.db_urldecode(table_attrs)
    hstr = attrs.get('handler', '')
    p = londiste.handler.build_handler('unused.string', hstr, None)
    return p.needs_table()

def find_copy_source(script, queue_name, copy_table_name, node_name, node_location):
    """Find source node for table.

    @param script: DbScript
    @param queue_name: name of the cascaded queue
    @param copy_table_name: name of the table (or list of names)
    @param node_name: target node name
    @param node_location: target node location
    @returns (node_name, node_location, downstream_worker_name) of source node
    """

    # None means no steps upwards were taken, so local consumer is worker
    worker_name = None

    if isinstance(copy_table_name, str):
        need = set([copy_table_name])
    else:
        need = set(copy_table_name)

    while 1:
        src_db = script.get_database('_source_db', connstr=node_location, autocommit=1, profile='remote')
        src_curs = src_db.cursor()

        q = "select * from pgq_node.get_node_info(%s)"
        src_curs.execute(q, [queue_name])
        info = src_curs.fetchone()
        if info['ret_code'] >= 400:
            raise skytools.UsageError("Node does not exist")

        script.log.info("Checking if %s can be used for copy", info['node_name'])

        q = "select table_name, local, table_attrs from londiste.get_table_list(%s)"
        src_curs.execute(q, [queue_name])
        got = set()
        for row in src_curs.fetchall():
            tbl = row['table_name']
            if tbl not in need:
                continue
            if not row['local']:
                script.log.debug("Problem: %s is not local", tbl)
                continue
            if not handler_allows_copy(row['table_attrs']):
                script.log.debug("Problem: %s handler does not store data [%s]", tbl, row['table_attrs'])
                continue
            script.log.debug("Good: %s is usable", tbl)
            got.add(tbl)

        script.close_database('_source_db')

        if got == need:
            script.log.info("Node %s seems good source, using it", info['node_name'])
            return node_name, node_location, worker_name
        else:
            script.log.info("Node %s does not have all tables", info['node_name'])

        if info['node_type'] == 'root':
            raise skytools.UsageError("Found root and no source found")

        # walk upwards
        node_name = info['provider_node']
        node_location = info['provider_location']
        worker_name = info['worker_name']