This file is indexed.

/usr/lib/python3/dist-packages/anosql/core.py is in python3-anosql 0.2.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
anosql main module
"""

import os
from functools import partial
import re


class SQLLoadException(Exception):
    pass


class SQLParseException(Exception):
    pass


SELECT = 1
INSERT_UPDATE_DELETE = 2
AUTO_GEN = 3


class Queries(object):

    def __init__(self, queries=list()):
        self.available_queries = []

        for name, fn in queries:
            self.add_query(name, fn)

    def __repr__(self):
        return "anosql.Queries(" + self.available_queries.__repr__() + ")"

    def add_query(self, name, fn):
        setattr(self, name, fn)

        if name not in self.available_queries:
            self.available_queries.append(name)


def get_fn_name(line):
    line = line.replace('-', '_')
    return line[9:]


def parse_sql_entry(db_type, e):
    is_sqlite = False
    is_postgres = False

    if db_type == 'sqlite':
        is_sqlite = True

    if db_type == 'postgres':
        is_postgres = True

    lines = e.split('\n')

    if not lines[0].startswith('-- name:'):
        raise SQLParseException('Query does not start with "-- name:".')

    name = get_fn_name(lines[0])
    doc = None

    if '<!' in name:
        sql_type = AUTO_GEN
        name = name.replace('<!', '_auto')
    elif '!' in name:
        sql_type = INSERT_UPDATE_DELETE
        name = name.replace('!', '')
    else:
        sql_type = SELECT

    if lines[1].startswith('-- '):
        doc = lines[1][3:]

    if doc:
        query = lines[2:]
    else:
        query = lines[1:]

    query = ' '.join(query)

    if is_postgres and sql_type == AUTO_GEN:
        query += ' RETURNING id'

    if is_sqlite:
        query = query.replace('%s', '?')
    elif is_postgres:
        query = re.sub(r'[^:]:([a-zA-Z_-]+)', r'%(\1)s', query)

    def fn(conn, *args, **kwargs):
        results = None
        cur = conn.cursor()

        if sql_type == INSERT_UPDATE_DELETE:
            cur.execute(query, kwargs if len(kwargs) > 0 else args)
            conn.commit()

        if is_postgres and sql_type == AUTO_GEN:
            cur.execute(query, kwargs if len(kwargs) > 0 else args)
            results = cur.fetchone()[0]
            conn.commit()

        if is_sqlite and sql_type == AUTO_GEN:
            cur.execute(query, args)
            results = cur.lastrowid
            conn.commit()

        if sql_type == SELECT:
            cur.execute(query, kwargs if len(kwargs) > 0 else args)
            results = cur.fetchall()

        cur.close()
        return results

    fn.__doc__ = doc
    fn.__query__ = query
    fn.func_name = name

    return name, fn


def parse_queries_string(db_type, s):
    return [parse_sql_entry(db_type, expression)
            for expression in s.split('\n\n')]


def load_queries(db_type, filename):
    if not os.path.exists(filename):
        raise SQLLoadException('Could not read file', filename)

    with open(filename) as queries_file:
        f = queries_file.read()

    queries = parse_queries_string(db_type, f)
    return Queries(queries)


def load_queries_from_string(db_type, string):
    queries = parse_queries_string(db_type, string)
    return Queries(queries)