This file is indexed.

/usr/share/pyshared/fusslauncher/engine.py is in fuss-launcher 0.5-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
# -*- python -*-
#
# fuss-launcher backend
#
# Copyright (C) 2010 Enrico Zini <enrico@truelite.it>
# Copyright (C) 2010 Christopher R. Gabriel <cgabriel@truelite.it>
# Copyright (C) 2010 The Fuss Project <info@fuss.bz.it>
#
# Authors: Christopher R. Gabriel <cgabriel@truelite.it>
#          Enrico Zini <enrico@truelite.it>
#
# Sponsored by the Fuss Project: http://www.fuss.bz.it/
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import os, os.path
import sys
from xdg.DesktopEntry import DesktopEntry
from xdg import Locale
import xapian
from subprocess import Popen, PIPE
import math
import axi

import gettext
_ = gettext.gettext

debug = False
verbose = False

# Yes, apt, thanks, I know, the api isn't stable, thank you so very much
#warnings.simplefilter('ignore', FutureWarning)
#import warnings
#warnings.filterwarnings("ignore","apt API not stable yet")
#import apt
#warnings.resetwarnings()

HOMEDIR=os.path.join(os.environ['HOME'], ".fuss-launcher")

class CompletionFilter(xapian.ExpandDecider):
    "Basic xapian term filter"
    def __init__(self, stemmer=None, exclude=None, prefix=None):
        super(CompletionFilter, self).__init__()
        self.stem = stemmer if stemmer else lambda x:x
        self.exclude = set([self.stem(x) for x in exclude]) if exclude else set()
        self.prefix = prefix
    def __call__(self, term):
        if len(term) < 4: return False
        # Don't complete with special things
        if not term[0].islower(): return False
        if self.prefix is not None:
            if not term.startswith(self.prefix):
                return False
        if self.stem(term) in self.exclude: return False
        return True

class TagFilter(xapian.ExpandDecider):
    "Xapian term filter that only keeps tags"
    def __call__(self, term):
        return term[:2] == "XT"

class InstalledFilter(xapian.MatchDecider):
    "Xapian document filter that only keeps installed packages"
    def __call__(self, doc):
        return os.path.exists(os.path.join("/var/lib/dpkg/info/%s.list" % doc.get_data()))

class Database(object):
    BOOLWORDS = set(["and", "or", "not"])

    def __init__(self, path, filter=None):
        # Index
        self.db = xapian.Database(path)

        # Stemmer based on current locale
        self.stem = None
        for lang in Locale.langs:
            try:
                lang = lang.split("_")[0]
                self.stem = xapian.Stem(lang)
                break
            except InvalidArgumentError:
                continue

        # Global query filter
        self.filter = filter

        # Global match decider
        self.matchdecider = None

        # Build query parsers
        def make_qp():
            qp = xapian.QueryParser()
            qp.set_default_op(xapian.Query.OP_AND)
            qp.set_database(self.db)
            if self.stem:
                qp.set_stemmer(self.stem)
            qp.set_stemming_strategy(xapian.QueryParser.STEM_SOME)
            qp.add_prefix("pkg", "XP")
            qp.add_boolean_prefix("tag", "XT")
            qp.add_boolean_prefix("sec", "XS")
            qp.add_boolean_prefix("cat", "XDT")
            return qp

        # Quer parser for main queries
        self.qp = make_qp()

        # Enquire engine
        self.enquire = xapian.Enquire(self.db)

        # Identical query parser and enquire to use for tab completion
        # queries
        self.compl_qp = make_qp()
        self.compl_enquire = xapian.Enquire(self.db)

        # Start with an empty query
        self.set_query("")

    def unprefix(self, term):
        "Convert DB prefixes to user prefixes"
        if term.startswith("XT"):
            return "tag:" + term[2:]
        elif term.startswith("XS"):
            return "sec:" + term[2:]
        elif term.startswith("XP"):
            return "pkg:" + term[2:]
        return term

    def set_query(self, query, qtags=[], extra=None):
        """
        Set query string and optional tag filter
        """
        query = query.encode("UTF-8")
        self.query_string = query
        self.query_tags = qtags

        query_kw = self.qp.parse_query(query,
                xapian.QueryParser.FLAG_BOOLEAN |
                xapian.QueryParser.FLAG_LOVEHATE |
                xapian.QueryParser.FLAG_BOOLEAN_ANY_CASE |
                xapian.QueryParser.FLAG_WILDCARD |
                xapian.QueryParser.FLAG_PURE_NOT |
                xapian.QueryParser.FLAG_SPELLING_CORRECTION |
                xapian.QueryParser.FLAG_AUTO_SYNONYMS |
                (xapian.QueryParser.FLAG_PARTIAL if len(query) > 2 else 0) )

        query = query_kw
        if qtags:
            self.query_tags = xapian.Query(xapian.Query.OP_AND, ["XT"+t for t in qtags])
            query = xapian.Query(xapian.Query.OP_AND, query, query_tags)
        else:
            self.query_tags = None
        if extra:
            self.query_extra = extra
            query = xapian.Query(xapian.Query.OP_OR, query, extra)
        else:
            self.query_extra = None
        if self.filter:
            query = xapian.Query(xapian.Query.OP_AND, query, self.filter)
        if debug: print >>sys.stderr, "engine: setting query", query
        self.enquire.set_query(query)

    def spellcheck(self):
        """
        Get spelling suggestions on the last query string passed to set_query
        """
        return self.qp.get_corrected_query_string()

    def documents(self, first=0, count=20, mdecider=None):
        """
        Get packages matching the last query set with set_query, sorted by
        decreasing relevance
        """
        if mdecider is None:
            mdecider = self.matchdecider
        documents = []
        for m in self.enquire.get_mset(first, count, None, mdecider):
            score = m.percent
            doc = m.document
            documents.append((score, doc))
        return documents

    def completion_prepare(self, qstring, has_extras=False):
        """
        Scan a query string extracting the useful parts for tag completion
        expansion.

        @param qstring the query string
        @param has_extras True if there is something else, other than the query
               string, that will be part of the final xapian query

        @returns prefix, lead, query, blacklist
         - prefix is used to filter completion results, only keeping the one
           that start with the given prefix. It is None if there is no
           partially typed word and we should be suggesting new words.
         - lead is a string to prepend to the completion results to obtain a
           completed query string;
         - query is the string to use to build a query to generate completion
           results. Can be none if there is no context available to build a
           query from.
         - blacklist is a set of terms not to use for completion
        """
        # Tokenize the query string
        args = qstring.split()

        # Remove the trailing term in case it is partially typed
        if qstring and not qstring[-1].isspace():
            prefix = args.pop(-1)
        else:
            prefix = None

        # Remove the trailing boolean terms in order to build a reasonable
        # query string
        while args and args[-1].lower() in self.BOOLWORDS:
            args.pop(-1)

        # Check if we have no context at all for which to do expnsion
        if not has_extras and not args:
            return prefix, "", None, set()
        else:
            # Compute blacklist
            blacklist = set([x for x in args if x.lower() not in self.BOOLWORDS])

            # Compute lead
            if prefix:
                lead = qstring[:-len(prefix)]
            else:
                lead = qstring
            return prefix, lead, " ".join(args), blacklist

    def completions(self):
        """
        Compute completions for the current query string
        """
        # Compute the strategy to use for completion
        prefix, lead, query, blacklist = self.completion_prepare(self.query_string, self.filter is not None)

        if not query and not prefix:
            # Show a preset list of tags
            return []
        elif not query:
            # Simple prefix search
            res = set((str(x) for x in self.db.synonym_keys(prefix) if x not in blacklist))
            res.update((term.term for term in self.db.allterms(prefix) if term not in blacklist))
            return [lead + x for x in sorted(res)]
        else:
            # Query for completion terms
            query = self.compl_qp.parse_query(query,
                    xapian.QueryParser.FLAG_BOOLEAN |
                    xapian.QueryParser.FLAG_LOVEHATE |
                    xapian.QueryParser.FLAG_BOOLEAN_ANY_CASE |
                    xapian.QueryParser.FLAG_WILDCARD |
                    xapian.QueryParser.FLAG_PURE_NOT |
                    xapian.QueryParser.FLAG_AUTO_SYNONYMS |
                    xapian.QueryParser.FLAG_SPELLING_CORRECTION)
            if self.query_tags:
                query = xapian.Query(xapian.Query.OP_AND, query, self.query_tags)
            if self.query_extra:
                query = xapian.Query(xapian.Query.OP_OR, query, self.query_extra)
            if self.filter:
                query = xapian.Query(xapian.Query.OP_AND, query, self.filter)
            self.compl_enquire.set_query(query)

            # Build an rset with the top 10 results
            rset = xapian.RSet()
            for m in self.compl_enquire.get_mset(0, 10, None, self.matchdecider):
                rset.add_document(m.docid)

            # Compute tab completions
            cfilter = CompletionFilter(stemmer=self.stem, exclude=blacklist, prefix=prefix)
            completions = []
            for r in self.compl_enquire.get_eset(15, rset, cfilter):
                completions.append(self.unprefix(r.term))

            # TODO: if partial, add synonims (it seems it cannot be done)
            return [lead + c for c in completions]


    def tagcloud(self):
        """
        Compute a context-sensitive tag cloud based on the last query set via
        set_query.
        """
        if not self.query_string and not self.query_tags:
            # Generate an initial tag cloud
            tags = [(x.term[2:], math.log(self.db.get_collection_freq(x.term))) for x in self.db.allterms("XT")]
            tags.sort(key=lambda x:x[1], reverse=True)
            tags = tags[:15]
            # Normalise the scores in the interval [0, 1]
            minscore = float(min([x[1] for x in tags]))/2
            maxscore = float(max([x[1] for x in tags])) - minscore
            tags = [(x[0], float(x[1]-minscore) / maxscore) for x in tags]
            tags.sort(key=lambda x:x[0])
            # TODO: return some precomputed set of keywords/tags for
            # completions, like the toplevel facets in axi-cache
            return tags

        # Build an rset with the top 10 results
        rset = xapian.RSet()
        for m in self.enquire.get_mset(0, 10, None, self.matchdecider):
            rset.add_document(m.docid)

        # Compute tag cloud
        tags = []
        maxscore = None
        for res in self.enquire.get_eset(15, rset, TagFilter()):
            # Normalise the score in the interval [0, 1]
            weight = math.log(res.weight)
            if maxscore == None: maxscore = weight
            tag = res.term[2:]
            tags.append( (tag, float(weight) / maxscore) )
        tags.sort(key=lambda x:x[0])
        return tags

class Engine(object):
    def __init__(self):
        if not os.path.exists(axi.XAPIANINDEX):
            self.create_main_index()
        # Only query packages that have .desktop files
        self.db = Database(axi.XAPIANINDEX, filter=xapian.Query("XD"))

    def set_install_only(self, val):
        if debug: print >>sys.stderr, "engine: set install_only", val
        if val:
            self.db.matchdecider = InstalledFilter()
        else:
            self.db.matchdecider = None

    def set_query(self, query, qtags=[]):
        self.db.set_query(query, qtags)

    def spellcheck(self):
        return self.db.spellcheck()

    def documents(self, first=0, count=20):
        res = []
        for score, doc in self.db.documents(first, count):
            # Here we get packages, we need to look for XDFname to get .desktop files
            for term in doc.termlist():
                if term.term.startswith("XDF"):
                    res.append((score, term.term[3:]))
        return res

    def completions(self):
        #if self.axi:
        #    return self.axi.completions()
        #else:
        return self.db.completions()

    def tagcloud(self):
        if self.axi:
            return self.axi.tagcloud()
        else:
            return self.db.tagcloud()

    def create_main_index(self):
        # Create the database directory if missing

        # TODO: rebuild using the DBUS interface
        raise RuntimeError, "please run update-apt-xapian-index as root"