/usr/bin/axi-cache is in apt-xapian-index 0.49.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 | #!/usr/bin/python
# coding: utf-8
#
# axi-cache - Query apt-xapian-index database
#
# Copyright (C) 2010 Enrico Zini <enrico@debian.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# TODO:
# - save NAME save the query to be recalled later with @NAME (or if notmuch
# has a syntax for saved queries, recycle it)
from optparse import OptionParser
from cStringIO import StringIO
import sys
import os, os.path
import axi
VERSION="0.47"
# Setup configuration
DEBTAGS_VOCABULARY = "/var/lib/debtags/vocabulary"
#XDG_CONFIG_HOME = os.environ.get("XDG_CONFIG_HOME", os.expanduser("~/.config"))
XDG_CACHE_HOME = os.environ.get("XDG_CACHE_HOME", os.path.expanduser("~/.cache"))
CACHEFILE = os.path.join(XDG_CACHE_HOME, "axi-cache.state")
try:
from ConfigParser import RawConfigParser
import re
import math
import xapian
import apt
try:
from debian import deb822
except ImportError:
from debian_bundle import deb822
helponly = False
except ImportError, e:
print >>sys.stderr, "%s: only help functions are implemented, for the sake of help2man" % str(e)
helponly = True
if not helponly:
class SavedStateError(Exception):
pass
class AptSilentProgress(apt.progress.text.OpProgress) :
"Quiet progress so we don't get cache loading messages from Apt"
def __init__(self):
pass
def done(self):
pass
def update(self, percent=None):
pass
def readVocabulary():
try:
fin = open(DEBTAGS_VOCABULARY)
except Exception, e:
# Only show this when being verbose
print >>sys.stderr, "Cannot read %s: %s. Please install `debtags' t" % (DEBTAGS_VOCABULARY, str(e))
return None, None
facets = dict()
tags = dict()
for entry in deb822.Deb822.iter_paragraphs(fin):
if "Facet" in entry:
facets[entry["Facet"]] = entry
elif "Tag" in entry:
tags[entry["Tag"]] = entry
return facets, tags
class DB(object):
class BasicFilter(xapian.ExpandDecider):
def __init__(self, stemmer=None, exclude=None, prefix=None):
super(DB.BasicFilter, self).__init__()
self.stem = stemmer if stemmer else lambda x:x
self.exclude = set([self.stem(x) for x in exclude]) if exclude else set()
self.prefix = prefix
def __call__(self, term):
if len(term) < 4: return False
if self.prefix is not None:
# Skip leading uppercase chars
t = term
while t and t[0].isupper():
t = t[1:]
if not t.startswith(self.prefix):
return False
if self.stem(term) in self.exclude: return False
if term.startswith("XT") or term.startswith("XS"): return True
return term[0].islower()
class TermFilter(BasicFilter):
def __call__(self, term):
if len(term) < 4: return False
if self.stem(term) in self.exclude: return False
return term[0].islower()
class TagFilter(xapian.ExpandDecider):
def __call__(self, term):
return term.startswith("XT")
def __init__(self):
# Access the Xapian index
self.db = xapian.Database(axi.XAPIANINDEX)
self.stem = xapian.Stem("english")
# Build query parser
self.qp = xapian.QueryParser()
self.qp.set_default_op(xapian.Query.OP_AND)
self.qp.set_database(self.db)
self.qp.set_stemmer(self.stem)
self.qp.set_stemming_strategy(xapian.QueryParser.STEM_SOME)
self.qp.add_prefix("pkg", "XP")
self.qp.add_boolean_prefix("tag", "XT")
self.qp.add_boolean_prefix("sec", "XS")
#notmuch->value_range_processor = new Xapian::NumberValueRangeProcessor (NOTMUCH_VALUE_TIMESTAMP);
#notmuch->query_parser->add_valuerangeprocessor (notmuch->value_range_processor);
# Read state from previous runs
self.cache = RawConfigParser()
if os.path.exists(CACHEFILE):
try:
self.cache.read(CACHEFILE)
except Exception, e:
print >>sys.stderr, e
print >>sys.stderr, "ignoring %s which seems to be corrupted" % CACHEFILE
self.dirty = False
self.facets = None
self.tags = None
def save(self):
"Save the state so we find it next time"
if self.dirty:
if not os.path.exists(XDG_CACHE_HOME):
os.makedirs(XDG_CACHE_HOME, mode=0700)
self.cache.write(open(CACHEFILE, "w"))
self.dirty = False
def vocabulary(self):
if self.facets is None:
self.facets, self.tags = readVocabulary()
return self.facets, self.tags
def unprefix(self, term):
"Convert DB prefixes to user prefixes"
if term.startswith("XT"):
return "tag:" + term[2:]
elif term.startswith("XS"):
return "sec:" + term[2:]
elif term.startswith("XP"):
return "pkg:" + term[2:]
return term
def is_tag(self, t):
return self.db.term_exists("XT" + t)
def set_query_args(self, args, secondary=False):
def term_or_tag(t):
if "::" in t and self.is_tag(t):
return "tag:" + t
else:
return t
args = map(term_or_tag, args)
self.set_query_string(" ".join(args), secondary=secondary)
def set_query_string(self, q, secondary=False):
"Set the query in the cache"
if not secondary:
self.set_cache_last("query", q)
self.unset_cache_last("secondary query")
else:
self.set_cache_last("secondary query", q)
self.unset_cache_last("skip")
def set_sort(self, key=None, cutoff=60):
"Set sorting method (default is by relevance)"
if key is None:
self.unset_cache_last("sort")
self.unset_cache_last("cutoff")
else:
self.set_cache_last("sort", key)
self.set_cache_last("cutoff", str(cutoff))
self.unset_cache_last("skip")
def build_query(self):
"Build query from cached query info"
q = self.get_cache_last("query")
if not self.cache.has_option("last", "query"):
raise SavedStateError("no saved query")
self.query = self.qp.parse_query(q,
xapian.QueryParser.FLAG_BOOLEAN |
xapian.QueryParser.FLAG_LOVEHATE |
xapian.QueryParser.FLAG_BOOLEAN_ANY_CASE |
xapian.QueryParser.FLAG_WILDCARD |
xapian.QueryParser.FLAG_PURE_NOT |
xapian.QueryParser.FLAG_SPELLING_CORRECTION |
xapian.QueryParser.FLAG_AUTO_SYNONYMS)
secondary = self.get_cache_last("secondary query", None)
if secondary:
secondary = self.qp.parse_query(secondary,
xapian.QueryParser.FLAG_BOOLEAN |
xapian.QueryParser.FLAG_LOVEHATE |
xapian.QueryParser.FLAG_BOOLEAN_ANY_CASE |
xapian.QueryParser.FLAG_WILDCARD |
xapian.QueryParser.FLAG_PURE_NOT |
xapian.QueryParser.FLAG_SPELLING_CORRECTION |
xapian.QueryParser.FLAG_AUTO_SYNONYMS)
self.query = xapian.Query(xapian.Query.OP_AND, self.query, secondary)
# print "Query:", self.query
# Build the enquire with the query
self.enquire = xapian.Enquire(self.db)
self.enquire.set_query(self.query)
sort = self.get_cache_last("sort")
if sort is not None:
values, descs = axi.readValueDB()
# If we don't sort by relevance, we need to specify a cutoff in order to
# remove poor results from the output
#
# Note: ept-cache implements an adaptive cutoff as follows:
# 1. Retrieve only one result, with default sorting. Read its relevance as
# the maximum relevance.
# 2. Set the cutoff as some percentage of the maximum relevance
# 3. Set sort by the wanted value
# 4. Perform the query
# TODO: didn't this use to work?
#self.enquire.set_cutoff(int(self.get_cache_last("cutoff", 60)))
reverse = sort[0] == '-' or sort[-1] == '-'
sort = sort.strip('-')
# Sort by the requested value
self.enquire.set_sort_by_value(values[sort], reverse)
def get_matches(self, first=None, count=20):
"""
Return a Xapian mset with the next page of results.
"""
if first is None:
first = int(self.get_cache_last("skip", 0))
self.set_cache_last("lastfirst", first)
self.set_cache_last("skip", first + count)
matches = self.enquire.get_mset(first, count)
return matches
def get_all_matches(self, first=None):
"""
Generate Xapian matches for all query matches
"""
if first is None:
first = int(self.get_cache_last("skip", 0))
self.unset_cache_last("lastfirst")
self.unset_cache_last("skip")
while True:
matches = self.enquire.get_mset(first, 100)
count = matches.size()
if count == 0: break
for m in matches:
yield m
first += 100
def get_spelling_correction(self):
return self.qp.get_corrected_query_string()
def get_suggestions(self, count=10, filter=None):
"""
Compute suggestions for more terms
Return a Xapian ESet
"""
# Use the first 30 results as the key ones to use to compute relevant
# terms
rset = xapian.RSet()
for m in self.enquire.get_mset(0, 30):
rset.add_document(m.docid)
# Get results, optionally filtered
if filter is None:
filter = self.BasicFilter()
return self.enquire.get_eset(count, rset, filter)
# ConfigParser access wrappers with lots of extra ifs, needed because the
# ConfigParser API has been designed to throw exceptions in the most stupid
# places one could possibly conceive
def get_cache_last(self, key, default=None):
if self.cache.has_option("last", key):
return self.cache.get("last", key)
return default
def set_cache_last(self, key, val):
if not self.cache.has_section("last"):
self.cache.add_section("last")
self.cache.set("last", key, val)
self.dirty = True
def unset_cache_last(self, key):
if not self.cache.has_section("last"):
return
self.cache.remove_option("last", key)
def get_rdeps(self, name, pfx):
"Return all the rdeps of type @pfx for package @name"
enquire = xapian.Enquire(self.db)
enquire.set_query(xapian.Query(pfx+name))
first = 0
while True:
found = 0
for m in enquire.get_mset(first, first + 20):
found += 1
yield m.document.get_data()
if found < 20:
break
first += 20
class Cmdline(object):
BOOLWORDS = set(["and", "or", "not"])
def __init__(self):
self.name = "axi-cache"
class Parser(OptionParser):
def __init__(self, cmdline, *args, **kwargs):
OptionParser.__init__(self, *args, **kwargs)
self.cmdline = cmdline
def error(self, msg):
sys.stderr.write("%s: error: %s\n\n" % (self.get_prog_name(), msg))
self.print_help(sys.stderr)
sys.exit(2)
def print_help(self, fd=sys.stdout):
commands = StringIO()
self.cmdline.format_help(commands)
buf = StringIO()
# Still in 2010 Cmdline is not an object. Oh dear.
OptionParser.print_help(self, buf)
buf = buf.getvalue().replace("ENDDESC", "\n\n" + commands.getvalue()[:-1])
fd.write(buf)
self.parser = Parser(self, usage="usage: %prog [options] command [args]",
version="%prog "+ VERSION,
description="Query the Apt Xapian index.ENDDESC")
self.parser.add_option("-s", "--sort", help="sort by the given value, as listed in %s. Add a '-' to reverse sort order" % axi.XAPIANDBVALUES)
self.parser.add_option("--tags", action="store_true", help="show matching tags, rather than packages")
self.parser.add_option("--tabcomplete", action="store", metavar="TYPE", help="suggest words for tab completion of the current command line (type is 'plain' or 'partial')")
self.parser.add_option("--last", action="store_true", help="use 'show --last' to limit tab completion to only the packages from the last search results")
self.parser.add_option("--all", action="store_true", help="disable pagination and always show all results. Note that search results are normally sorted by relevance, so you may find meaningless results at the end of the output")
(opts, args) = self.parser.parse_args()
self.opts = opts
self.args = args
if opts.tags and opts.all:
self.parser.error("--tags conflicts with --all")
def tabcomplete_query_args(self):
if self.opts.tabcomplete == "partial" and self.args:
queryargs = self.args[:-1]
partial = self.args[-1]
else:
queryargs = self.args
partial = None
# Remove trailing boolean terms
while queryargs and queryargs[-1].lower() in self.BOOLWORDS:
queryargs = queryargs[:-1]
return queryargs, partial
def do_info(self, args):
"info: print information about the apt-xapian-index environment"
import time
import datetime
import axi.indexer
# General info
print "Main data directory:", axi.XAPIANDBPATH
try:
cur_timestamp = os.path.getmtime(axi.XAPIANDBSTAMP)
cur_time = time.strftime("%c", time.localtime(cur_timestamp))
cur_time = "last updated: " + cur_time
except e:
cur_timestamp = 0
cur_time = "not available: " + str(e)
print "Update timestamp: %s (%s)" % (axi.XAPIANDBSTAMP, cur_time)
try:
index_loc = open(axi.XAPIANINDEX).read().split(" ", 1)[1].strip()
index_loc = "pointing to " + index_loc
except e:
index_loc = "not available: " + str(e)
print "Index location: %s (%s)" % (axi.XAPIANINDEX, index_loc)
def fileinfo(fname):
if os.path.exists(fname):
return fname
else:
return fname + " (available from next reindex)"
print "Documentation of index contents:", fileinfo(axi.XAPIANDBDOC)
print "Documentation of available prefixes:", fileinfo(axi.XAPIANDBPREFIXES)
print "Documentation of available values:", fileinfo(axi.XAPIANDBVALUES)
print "Plugin directory:", axi.PLUGINDIR
# Aggregated plugin information
# { name: { path=path, desc=desc, status=status } }
plugins = dict()
# Aggregated value information
# { valuename: { val=num, desc=shortdesc, plugins=[plugin names] } }
values, descs = axi.readValueDB()
values = dict(((a, dict(val=b, desc=descs[a], plugins=[])) for a, b in values.iteritems()))
# Aggregated data source information
# { pathname: { desc=shortdesc, plugins=[plugin names] } }
sources = dict()
# Per-plugin info
all_plugins_names = set((x for x in os.listdir(axi.PLUGINDIR) if x.endswith(".py")))
for plugin in axi.indexer.Plugins():
all_plugins_names.remove(plugin.filename)
doc = plugin.obj.doc() or dict()
# Last update status / whether an update is due
# TODO: check if the data from the plugin is present in the index
ts = plugin.info["timestamp"]
if cur_timestamp == 0:
status = "enabled, not indexed"
elif ts == 0:
status = "enabled, up to date"
elif ts <= cur_timestamp:
delta = datetime.timedelta(seconds=cur_timestamp-ts)
status = "enabled, up to date (%s older than index)" % str(delta)
else:
delta = datetime.timedelta(seconds=ts-cur_timestamp)
status = "enabled, needs indexing (%s newer than index)" % str(delta)
desc = doc.get("shortDesc", "(description unavailable)")
plugins[plugin.name] = dict(
path=os.path.join(axi.PLUGINDIR, plugin.filename),
desc=desc,
status=status,
)
# Aggregate info about values
for v in plugin.info.get("values", []):
name = v.get("name", None)
if name is None: continue
if name not in values:
values[name] = dict(val=None, desc=v.get("desc", "(unknown"), plugins=[])
values[name].setdefault("plugins", []).append(plugin.name)
# Data files used by the plugin
for s in plugin.info.get("sources", []):
path = s.get("path", "(unknown)")
if path not in sources:
sources[path] = dict(desc=s.get("desc", "(unknown)"), plugins=[])
sources[path].setdefault("plugins", []).append(plugin.name)
# Disabled plugins
for plugin in sorted(all_plugins_names):
desc = doc.get("shortDesc", "(unavailable)")
name = os.path.splitext(plugin)[0]
plugins[name] = dict(
path=os.path.join(axi.PLUGINDIR, plugin),
desc="(plugin disabled, description unavailable)",
status="disabled",
)
# Plugin information
# { name: { path=path, desc=desc, status=status } }
#print "Plugins:"
#maxname = max((len(x) for x in plugins.iterkeys()))
#for name, info in sorted(plugins.iteritems(), key=lambda x:x[0]):
# print " %s:" % info["path"]
# print " ", info["desc"]
print "Plugin status:"
maxname = max((len(x) for x in plugins.iterkeys()))
for name, info in sorted(plugins.iteritems(), key=lambda x:x[0]):
print " ", name.ljust(maxname), info["status"]
# Value information
print "Values:"
maxname = max((len(x) for x in values.iterkeys()))
print " ", "Value".ljust(maxname), "Code", "Provided by"
for name, val in sorted(values.iteritems(), key=lambda x:x[0]):
plugins = val.get("plugins", [])
if plugins:
provider = ", ".join(plugins)
else:
provider = "update-apt-xapian-index"
print " ", name.ljust(maxname), "%4d" % int(val["val"]), provider
# Source files information
print "Data sources:"
maxpath = 0
maxdesc = 0
for k, v in sources.iteritems():
if len(k) > maxpath: maxpath = len(k)
if len(v["desc"]) > maxdesc: maxdesc = len(v["desc"])
print " ", "Source".ljust(maxpath), "Description".ljust(maxdesc), "Used by"
for path, info in sources.iteritems():
provider = ", ".join(info.get("plugins", []))
print " ", path.ljust(maxpath), info["desc"].ljust(maxdesc), provider
return 0
def do_search(self, args):
"search [terms]: start a new search"
self.db = DB()
self.db.set_query_args(args)
if self.opts.sort:
self.db.set_sort(self.opts.sort)
else:
self.db.set_sort(None)
self.db.build_query()
if self.opts.all:
self.print_all_matches(self.db.get_all_matches())
else:
self.print_matches(self.db.get_matches())
if not self.opts.tabcomplete:
self.db.save()
return 0
def complete_search(self):
self.db = DB()
if self.opts.tabcomplete == "partial" and len(self.args) == 1:
# Simple prefix search
terms = set()
terms.update((str(term) for term in self.db.db.synonym_keys(self.args[0])))
terms.update((term.term for term in self.db.db.allterms(self.args[0])))
for term in sorted(terms):
print term
for term in self.db.db.allterms("XT" + self.args[0]):
print term.term[2:]
return 0
elif self.opts.tabcomplete == "plain" and not self.args:
# Show a preset list of tags
for facet in ["interface", "role", "use", "works-with"]:
for term in self.db.db.allterms("XT" + facet + "::"):
print term.term[2:]
return 0
else:
# Context-sensitive hints
qargs, partial = self.tabcomplete_query_args()
self.db.set_query_args(qargs)
self.db.build_query()
self.print_completions(self.db.get_matches())
return 0
def do_again(self, args):
"again [query]: repeat the last search, possibly adding query terms"
self.db = DB()
self.db.set_query_args(args, secondary=True)
if self.opts.sort:
self.db.set_sort(self.opts.sort)
else:
self.db.set_sort(None)
try:
self.db.build_query()
except SavedStateError, e:
print >>sys.stderr, "%s: maybe you need to run '%s search' first?" % (self.name, str(e))
return 1
self.print_matches(self.db.get_matches(first = 0))
if not self.opts.tabcomplete:
self.db.save()
return 0
def complete_again(self):
self.db = DB()
qargs, partial = self.tabcomplete_query_args()
self.db.set_query_args(qargs, secondary=True)
try:
self.db.build_query()
except SavedStateError, e:
return 0
self.print_completions(self.db.get_matches(first = 0))
return 0
def do_last(self, args):
"last [count]: show the last results again"
self.db = DB()
try:
self.db.build_query()
except SavedStateError, e:
print >>sys.stderr, "%s: maybe you need to run '%s search' first?" % (self.name, str(e))
return 1
count = int(args[0]) if args else 20
first = int(self.db.get_cache_last("lastfirst", 0))
self.print_matches(self.db.get_matches(first=first, count=count))
return 0
def do_more(self, args):
"more [count]: show more terms from the last search"
self.db = DB()
try:
self.db.build_query()
except SavedStateError, e:
print >>sys.stderr, "%s: maybe you need to run '%s search' first?" % (self.name, str(e))
return 1
count = int(args[0]) if args else 20
self.print_matches(self.db.get_matches(count=count))
if not self.opts.tabcomplete:
self.db.save()
return 0
def do_show(self, args):
"show pkgname[s]: run apt-cache show pkgname[s]"
os.execvp("/usr/bin/apt-cache", ["apt-cache", "show"] + args)
def complete_show(self):
self.db = DB()
if self.opts.tabcomplete == "partial" and self.args:
blacklist = set(self.args[:-1])
partial = self.args[-1]
else:
blacklist = set(self.args)
partial = None
if self.opts.last:
# Expand from output of last query
try:
self.db.build_query()
first = int(self.db.get_cache_last("lastfirst", 0))
for m in self.db.get_matches(first = first):
pkg = m.document.get_data()
if partial is not None and not pkg.startswith(partial): continue
if pkg not in blacklist:
print pkg
except SavedStateError, e:
return 0
else:
# Prefix expand
for term in self.db.db.allterms("XP" + (partial or "")):
pkg = term.term[2:]
if pkg not in blacklist:
print pkg
return 0
def do_showpkg(self, args):
"showpkg pkgname[s]: run apt-cache showpkg pkgname[s]"
os.execvp("/usr/bin/apt-cache", ["apt-cache", "showpkg"] + args)
complete_showpkg = complete_show
def do_showsrc(self, args):
"showsrc pkgname[s]: run apt-cache showsrc pkgname[s]"
os.execvp("/usr/bin/apt-cache", ["apt-cache", "showsrc"] + args)
complete_showsrc = complete_show
def do_depends(self, args):
"depends pkgname[s]: run apt-cache depends pkgname[s]"
os.execvp("/usr/bin/apt-cache", ["apt-cache", "depends"] + args)
complete_depends = complete_show
def do_rdepends(self, args):
"rdepends pkgname[s]: run apt-cache rdepends pkgname[s]"
db = DB()
for name in args:
print name
print "Reverse Depends:"
for pfx in ("XRD", "XRR", "XRS", "XRE", "XRP", "XRB", "XRC"):
for pkg in db.get_rdeps(name, pfx):
print " ", pkg
complete_rdepends = complete_show
def do_rdetails(self, args):
"rdetails pkgname[s]: show details of reverse relationships for the given packages"
db = DB()
for name in args:
for pfx, tag in (
("XRD", "dep"),
("XRR", "rec"),
("XRS", "sug"),
("XRE", "enh"),
("XRP", "pre"),
("XRB", "bre"),
("XRC", "con")):
deps = list(db.get_rdeps(name, pfx))
if not deps: continue
print name, tag, " ".join(deps)
complete_rdetails = complete_show
def do_policy(self, args):
"policy pkgname[s]: run apt-cache policy pkgname[s]"
os.execvp("/usr/bin/apt-cache", ["apt-cache", "policy"] + args)
complete_policy = complete_show
def do_madison(self, args):
"madison pkgname[s]: run apt-cache madison pkgname[s]"
os.execvp("/usr/bin/apt-cache", ["apt-cache", "madison"] + args)
complete_madison = complete_show
def format_help(self, out):
print >>out, "Commands:"
itemlist = []
maxusage = 0
for k, v in sorted(self.__class__.__dict__.iteritems()):
if not k.startswith("do_"): continue
line = self.name + " " + v.__doc__
usage, desc = line.split(": ", 1)
if len(usage) > maxusage:
maxusage = len(usage)
itemlist.append([usage, desc])
print >>out, " search commands:"
for usage, desc in [x for x in itemlist if "run apt-cache" not in x[1]]:
print >>out, " %-*.*s %s" % (maxusage, maxusage, usage, desc)
print >>out, " apt-cache front-ends:"
for usage, desc in [x for x in itemlist if "run apt-cache" in x[1]]:
print >>out, " %-*.*s %s" % (maxusage, maxusage, usage, desc)
def do_help(self, args):
"help: show a summary of commands"
self.parser.print_help()
return 0
def clean_suggestions(self, eset):
res = []
for r in eset:
res.append(self.db.unprefix(r.term))
#res.sort()
return res
def print_completions(self, matches):
if self.opts.tabcomplete == "partial":
prefix = self.args[-1] if self.args else None
exclude = self.args[:-1]
else:
prefix = None
exclude = self.args
exclude = [x for x in exclude if x.lower() not in self.BOOLWORDS]
for s in self.clean_suggestions(self.db.get_suggestions(count=10, filter=DB.BasicFilter(stemmer=self.db.stem, exclude=exclude, prefix=prefix))):
if s.startswith("tag:"):
print s[4:]
else:
print s
def print_matches(self, matches):
if self.opts.tags:
facets, tags = self.db.vocabulary()
def describe_tag(tag):
f, t = tag.split("::")
try:
fd = facets[f]["Description"].split("\n", 1)[0].strip()
td = tags[tag]["Description"].split("\n", 1)[0].strip()
return "%s: %s" % (fd, td)
except KeyError:
return None
maxscore = None
for res in self.db.get_suggestions(count=20, filter=DB.TagFilter()):
# Normalise the score in the interval [0, 1]
weight = math.log(res.weight)
if maxscore == None: maxscore = weight
score = float(weight) / maxscore
tag = self.db.unprefix(res.term)[4:]
desc = describe_tag(tag)
if desc is None:
print "%i%% %s" % (int(score * 100), tag)
else:
print "%i%% %s -- %s" % (int(score * 100), tag, desc)
else:
est = matches.get_matches_estimated()
first = matches.get_firstitem()
count = matches.size()
print "%i results found." % est
if count != 0:
print "Results %i-%i:" % (first + 1, first + count)
elif first != 0:
print "No more results to show"
self.print_all_matches((m for m in matches))
if first == 0:
sc = self.db.get_spelling_correction()
if sc:
print "Did you mean:", sc, "?"
sugg = self.clean_suggestions(self.db.get_suggestions(count=7, filter=DB.TermFilter(stemmer=self.db.stem, exclude=self.args)))
print "More terms:", " ".join(sugg)
stags = self.clean_suggestions(self.db.get_suggestions(count=7, filter=DB.TagFilter()))
print "More tags:", " ".join([x[4:] for x in stags])
if first + count < est:
print "`%s more' will give more results" % self.name
if first > 0:
print "`%s again' will restart the search" % self.name
def print_all_matches(self, matches_iter):
"""
Given an iterator of Xapian matches, print them all out nicely
formatted
"""
aptcache = apt.Cache(progress=AptSilentProgress())
for m in matches_iter:
name = m.document.get_data()
try:
pkg = aptcache[name]
except KeyError:
pkg = None
if pkg is not None and pkg.candidate:
print "%i%% %s - %s" % (m.percent, name, pkg.candidate.summary)
else:
print "%i%% %s - (unknown by apt)" % (m.percent, name)
def perform(self):
self.cmd = "help" if not self.args else self.args.pop(0)
if self.opts.tabcomplete is not None:
f = getattr(self, "complete_" + self.cmd, None)
if f is None:
return 0
return f()
else:
f = getattr(self, "do_" + self.cmd, None)
if f is None:
print >>sys.stderr, "Invalid command: `%s'.\n" % self.cmd
self.do_help(self.args)
return 1
try:
return f(self.args)
except xapian.FeatureUnavailableError:
print >>sys.stderr, "Existing Xapian index is incompatible, please rebuild it using\n"
print >>sys.stderr, "the update-apt-xapian-index command run as root (using sudo/su).\n"
return 1
if __name__ == "__main__":
ui = Cmdline()
if helponly: sys.exit(0)
sys.exit(ui.perform())
|