/usr/share/pyshared/bzrlib/ui/text.py is in python-bzrlib 2.6.0~bzr6526-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 | # Copyright (C) 2005-2011 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""Text UI, write output to the console.
"""
from __future__ import absolute_import
import os
import sys
import time
from bzrlib.lazy_import import lazy_import
lazy_import(globals(), """
import codecs
import getpass
import warnings
from bzrlib import (
debug,
progress,
osutils,
trace,
)
""")
from bzrlib.ui import (
UIFactory,
NullProgressView,
)
class _ChooseUI(object):
""" Helper class for choose implementation.
"""
def __init__(self, ui, msg, choices, default):
self.ui = ui
self._setup_mode()
self._build_alternatives(msg, choices, default)
def _setup_mode(self):
"""Setup input mode (line-based, char-based) and echo-back.
Line-based input is used if the BZR_TEXTUI_INPUT environment
variable is set to 'line-based', or if there is no controlling
terminal.
"""
if os.environ.get('BZR_TEXTUI_INPUT') != 'line-based' and \
self.ui.stdin == sys.stdin and self.ui.stdin.isatty():
self.line_based = False
self.echo_back = True
else:
self.line_based = True
self.echo_back = not self.ui.stdin.isatty()
def _build_alternatives(self, msg, choices, default):
"""Parse choices string.
Setup final prompt and the lists of choices and associated
shortcuts.
"""
index = 0
help_list = []
self.alternatives = {}
choices = choices.split('\n')
if default is not None and default not in range(0, len(choices)):
raise ValueError("invalid default index")
for c in choices:
name = c.replace('&', '').lower()
choice = (name, index)
if name in self.alternatives:
raise ValueError("duplicated choice: %s" % name)
self.alternatives[name] = choice
shortcut = c.find('&')
if -1 != shortcut and (shortcut + 1) < len(c):
help = c[:shortcut]
help += '[' + c[shortcut + 1] + ']'
help += c[(shortcut + 2):]
shortcut = c[shortcut + 1]
else:
c = c.replace('&', '')
shortcut = c[0]
help = '[%s]%s' % (shortcut, c[1:])
shortcut = shortcut.lower()
if shortcut in self.alternatives:
raise ValueError("duplicated shortcut: %s" % shortcut)
self.alternatives[shortcut] = choice
# Add redirections for default.
if index == default:
self.alternatives[''] = choice
self.alternatives['\r'] = choice
help_list.append(help)
index += 1
self.prompt = u'%s (%s): ' % (msg, ', '.join(help_list))
def _getline(self):
line = self.ui.stdin.readline()
if '' == line:
raise EOFError
return line.strip()
def _getchar(self):
char = osutils.getchar()
if char == chr(3): # INTR
raise KeyboardInterrupt
if char == chr(4): # EOF (^d, C-d)
raise EOFError
return char
def interact(self):
"""Keep asking the user until a valid choice is made.
"""
if self.line_based:
getchoice = self._getline
else:
getchoice = self._getchar
iter = 0
while True:
iter += 1
if 1 == iter or self.line_based:
self.ui.prompt(self.prompt)
try:
choice = getchoice()
except EOFError:
self.ui.stderr.write('\n')
return None
except KeyboardInterrupt:
self.ui.stderr.write('\n')
raise KeyboardInterrupt
choice = choice.lower()
if choice not in self.alternatives:
# Not a valid choice, keep on asking.
continue
name, index = self.alternatives[choice]
if self.echo_back:
self.ui.stderr.write(name + '\n')
return index
class TextUIFactory(UIFactory):
"""A UI factory for Text user interefaces."""
def __init__(self,
stdin=None,
stdout=None,
stderr=None):
"""Create a TextUIFactory.
"""
super(TextUIFactory, self).__init__()
# TODO: there's no good reason not to pass all three streams, maybe we
# should deprecate the default values...
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
# paints progress, network activity, etc
self._progress_view = self.make_progress_view()
def choose(self, msg, choices, default=None):
"""Prompt the user for a list of alternatives.
Support both line-based and char-based editing.
In line-based mode, both the shortcut and full choice name are valid
answers, e.g. for choose('prompt', '&yes\n&no'): 'y', ' Y ', ' yes',
'YES ' are all valid input lines for choosing 'yes'.
An empty line, when in line-based mode, or pressing enter in char-based
mode will select the default choice (if any).
Choice is echoed back if:
- input is char-based; which means a controlling terminal is available,
and osutils.getchar is used
- input is line-based, and no controlling terminal is available
"""
choose_ui = _ChooseUI(self, msg, choices, default)
return choose_ui.interact()
def be_quiet(self, state):
if state and not self._quiet:
self.clear_term()
UIFactory.be_quiet(self, state)
self._progress_view = self.make_progress_view()
def clear_term(self):
"""Prepare the terminal for output.
This will, clear any progress bars, and leave the cursor at the
leftmost position."""
# XXX: If this is preparing to write to stdout, but that's for example
# directed into a file rather than to the terminal, and the progress
# bar _is_ going to the terminal, we shouldn't need
# to clear it. We might need to separately check for the case of
self._progress_view.clear()
def get_integer(self, prompt):
while True:
self.prompt(prompt)
line = self.stdin.readline()
try:
return int(line)
except ValueError:
pass
def get_non_echoed_password(self):
isatty = getattr(self.stdin, 'isatty', None)
if isatty is not None and isatty():
# getpass() ensure the password is not echoed and other
# cross-platform niceties
password = getpass.getpass('')
else:
# echo doesn't make sense without a terminal
password = self.stdin.readline()
if not password:
password = None
elif password[-1] == '\n':
password = password[:-1]
return password
def get_password(self, prompt=u'', **kwargs):
"""Prompt the user for a password.
:param prompt: The prompt to present the user
:param kwargs: Arguments which will be expanded into the prompt.
This lets front ends display different things if
they so choose.
:return: The password string, return None if the user
canceled the request.
"""
prompt += ': '
self.prompt(prompt, **kwargs)
# There's currently no way to say 'i decline to enter a password'
# as opposed to 'my password is empty' -- does it matter?
return self.get_non_echoed_password()
def get_username(self, prompt, **kwargs):
"""Prompt the user for a username.
:param prompt: The prompt to present the user
:param kwargs: Arguments which will be expanded into the prompt.
This lets front ends display different things if
they so choose.
:return: The username string, return None if the user
canceled the request.
"""
prompt += ': '
self.prompt(prompt, **kwargs)
username = self.stdin.readline()
if not username:
username = None
elif username[-1] == '\n':
username = username[:-1]
return username
def make_progress_view(self):
"""Construct and return a new ProgressView subclass for this UI.
"""
# with --quiet, never any progress view
# <https://bugs.launchpad.net/bzr/+bug/320035>. Otherwise if the
# user specifically requests either text or no progress bars, always
# do that. otherwise, guess based on $TERM and tty presence.
if self.is_quiet():
return NullProgressView()
elif os.environ.get('BZR_PROGRESS_BAR') == 'text':
return TextProgressView(self.stderr)
elif os.environ.get('BZR_PROGRESS_BAR') == 'none':
return NullProgressView()
elif progress._supports_progress(self.stderr):
return TextProgressView(self.stderr)
else:
return NullProgressView()
def _make_output_stream_explicit(self, encoding, encoding_type):
if encoding_type == 'exact':
# force sys.stdout to be binary stream on win32;
# NB: this leaves the file set in that mode; may cause problems if
# one process tries to do binary and then text output
if sys.platform == 'win32':
fileno = getattr(self.stdout, 'fileno', None)
if fileno:
import msvcrt
msvcrt.setmode(fileno(), os.O_BINARY)
return TextUIOutputStream(self, self.stdout)
else:
encoded_stdout = codecs.getwriter(encoding)(self.stdout,
errors=encoding_type)
# For whatever reason codecs.getwriter() does not advertise its encoding
# it just returns the encoding of the wrapped file, which is completely
# bogus. So set the attribute, so we can find the correct encoding later.
encoded_stdout.encoding = encoding
return TextUIOutputStream(self, encoded_stdout)
def note(self, msg):
"""Write an already-formatted message, clearing the progress bar if necessary."""
self.clear_term()
self.stdout.write(msg + '\n')
def prompt(self, prompt, **kwargs):
"""Emit prompt on the CLI.
:param kwargs: Dictionary of arguments to insert into the prompt,
to allow UIs to reformat the prompt.
"""
if type(prompt) != unicode:
raise ValueError("prompt %r not a unicode string" % prompt)
if kwargs:
# See <https://launchpad.net/bugs/365891>
prompt = prompt % kwargs
prompt = prompt.encode(osutils.get_terminal_encoding(), 'replace')
self.clear_term()
self.stdout.flush()
self.stderr.write(prompt)
def report_transport_activity(self, transport, byte_count, direction):
"""Called by transports as they do IO.
This may update a progress bar, spinner, or similar display.
By default it does nothing.
"""
self._progress_view.show_transport_activity(transport,
direction, byte_count)
def log_transport_activity(self, display=False):
"""See UIFactory.log_transport_activity()"""
log = getattr(self._progress_view, 'log_transport_activity', None)
if log is not None:
log(display=display)
def show_error(self, msg):
self.clear_term()
self.stderr.write("bzr: error: %s\n" % msg)
def show_message(self, msg):
self.note(msg)
def show_warning(self, msg):
self.clear_term()
if isinstance(msg, unicode):
te = osutils.get_terminal_encoding()
msg = msg.encode(te, 'replace')
self.stderr.write("bzr: warning: %s\n" % msg)
def _progress_updated(self, task):
"""A task has been updated and wants to be displayed.
"""
if not self._task_stack:
warnings.warn("%r updated but no tasks are active" %
(task,))
elif task != self._task_stack[-1]:
# We used to check it was the top task, but it's hard to always
# get this right and it's not necessarily useful: any actual
# problems will be evident in use
#warnings.warn("%r is not the top progress task %r" %
# (task, self._task_stack[-1]))
pass
self._progress_view.show_progress(task)
def _progress_all_finished(self):
self._progress_view.clear()
def show_user_warning(self, warning_id, **message_args):
"""Show a text message to the user.
Explicitly not for warnings about bzr apis, deprecations or internals.
"""
# eventually trace.warning should migrate here, to avoid logging and
# be easier to test; that has a lot of test fallout so for now just
# new code can call this
if warning_id not in self.suppressed_warnings:
self.stderr.write(self.format_user_warning(warning_id, message_args) +
'\n')
class TextProgressView(object):
"""Display of progress bar and other information on a tty.
This shows one line of text, including possibly a network indicator, spinner,
progress bar, message, etc.
One instance of this is created and held by the UI, and fed updates when a
task wants to be painted.
Transports feed data to this through the ui_factory object.
The Progress views can comprise a tree with _parent_task pointers, but
this only prints the stack from the nominated current task up to the root.
"""
def __init__(self, term_file, encoding=None, errors="replace"):
self._term_file = term_file
if encoding is None:
self._encoding = getattr(term_file, "encoding", None) or "ascii"
else:
self._encoding = encoding
self._encoding_errors = errors
# true when there's output on the screen we may need to clear
self._have_output = False
self._last_transport_msg = ''
self._spin_pos = 0
# time we last repainted the screen
self._last_repaint = 0
# time we last got information about transport activity
self._transport_update_time = 0
self._last_task = None
self._total_byte_count = 0
self._bytes_since_update = 0
self._bytes_by_direction = {'unknown': 0, 'read': 0, 'write': 0}
self._first_byte_time = None
self._fraction = 0
# force the progress bar to be off, as at the moment it doesn't
# correspond reliably to overall command progress
self.enable_bar = False
def _avail_width(self):
# we need one extra space for terminals that wrap on last char
w = osutils.terminal_width()
if w is None:
return None
else:
return w - 1
def _show_line(self, u):
s = u.encode(self._encoding, self._encoding_errors)
width = self._avail_width()
if width is not None:
# GZ 2012-03-28: Counting bytes is wrong for calculating width of
# text but better than counting codepoints.
s = '%-*.*s' % (width, width, s)
self._term_file.write('\r' + s + '\r')
def clear(self):
if self._have_output:
self._show_line('')
self._have_output = False
def _render_bar(self):
# return a string for the progress bar itself
if self.enable_bar and (
(self._last_task is None) or self._last_task.show_bar):
# If there's no task object, we show space for the bar anyhow.
# That's because most invocations of bzr will end showing progress
# at some point, though perhaps only after doing some initial IO.
# It looks better to draw the progress bar initially rather than
# to have what looks like an incomplete progress bar.
spin_str = r'/-\|'[self._spin_pos % 4]
self._spin_pos += 1
cols = 20
if self._last_task is None:
completion_fraction = 0
self._fraction = 0
else:
completion_fraction = \
self._last_task._overall_completion_fraction() or 0
if (completion_fraction < self._fraction and 'progress' in
debug.debug_flags):
import pdb;pdb.set_trace()
self._fraction = completion_fraction
markers = int(round(float(cols) * completion_fraction)) - 1
bar_str = '[' + ('#' * markers + spin_str).ljust(cols) + '] '
return bar_str
elif (self._last_task is None) or self._last_task.show_spinner:
# The last task wanted just a spinner, no bar
spin_str = r'/-\|'[self._spin_pos % 4]
self._spin_pos += 1
return spin_str + ' '
else:
return ''
def _format_task(self, task):
"""Format task-specific parts of progress bar.
:returns: (text_part, counter_part) both unicode strings.
"""
if not task.show_count:
s = ''
elif task.current_cnt is not None and task.total_cnt is not None:
s = ' %d/%d' % (task.current_cnt, task.total_cnt)
elif task.current_cnt is not None:
s = ' %d' % (task.current_cnt)
else:
s = ''
# compose all the parent messages
t = task
m = task.msg
while t._parent_task:
t = t._parent_task
if t.msg:
m = t.msg + ':' + m
return m, s
def _render_line(self):
bar_string = self._render_bar()
if self._last_task:
task_part, counter_part = self._format_task(self._last_task)
else:
task_part = counter_part = ''
if self._last_task and not self._last_task.show_transport_activity:
trans = ''
else:
trans = self._last_transport_msg
# the bar separates the transport activity from the message, so even
# if there's no bar or spinner, we must show something if both those
# fields are present
if (task_part or trans) and not bar_string:
bar_string = '| '
# preferentially truncate the task message if we don't have enough
# space
avail_width = self._avail_width()
if avail_width is not None:
# if terminal avail_width is unknown, don't truncate
current_len = len(bar_string) + len(trans) + len(task_part) + len(counter_part)
gap = current_len - avail_width
if gap > 0:
task_part = task_part[:-gap-2] + '..'
s = trans + bar_string + task_part + counter_part
if avail_width is not None:
if len(s) < avail_width:
s = s.ljust(avail_width)
elif len(s) > avail_width:
s = s[:avail_width]
return s
def _repaint(self):
s = self._render_line()
self._show_line(s)
self._have_output = True
def show_progress(self, task):
"""Called by the task object when it has changed.
:param task: The top task object; its parents are also included
by following links.
"""
must_update = task is not self._last_task
self._last_task = task
now = time.time()
if (not must_update) and (now < self._last_repaint + task.update_latency):
return
if now > self._transport_update_time + 10:
# no recent activity; expire it
self._last_transport_msg = ''
self._last_repaint = now
self._repaint()
def show_transport_activity(self, transport, direction, byte_count):
"""Called by transports via the ui_factory, as they do IO.
This may update a progress bar, spinner, or similar display.
By default it does nothing.
"""
# XXX: there should be a transport activity model, and that too should
# be seen by the progress view, rather than being poked in here.
self._total_byte_count += byte_count
self._bytes_since_update += byte_count
if self._first_byte_time is None:
# Note that this isn't great, as technically it should be the time
# when the bytes started transferring, not when they completed.
# However, we usually start with a small request anyway.
self._first_byte_time = time.time()
if direction in self._bytes_by_direction:
self._bytes_by_direction[direction] += byte_count
else:
self._bytes_by_direction['unknown'] += byte_count
if 'no_activity' in debug.debug_flags:
# Can be used as a workaround if
# <https://launchpad.net/bugs/321935> reappears and transport
# activity is cluttering other output. However, thanks to
# TextUIOutputStream this shouldn't be a problem any more.
return
now = time.time()
if self._total_byte_count < 2000:
# a little resistance at first, so it doesn't stay stuck at 0
# while connecting...
return
if self._transport_update_time is None:
self._transport_update_time = now
elif now >= (self._transport_update_time + 0.5):
# guard against clock stepping backwards, and don't update too
# often
rate = (self._bytes_since_update
/ (now - self._transport_update_time))
# using base-10 units (see HACKING.txt).
msg = ("%6dkB %5dkB/s " %
(self._total_byte_count / 1000, int(rate) / 1000,))
self._transport_update_time = now
self._last_repaint = now
self._bytes_since_update = 0
self._last_transport_msg = msg
self._repaint()
def _format_bytes_by_direction(self):
if self._first_byte_time is None:
bps = 0.0
else:
transfer_time = time.time() - self._first_byte_time
if transfer_time < 0.001:
transfer_time = 0.001
bps = self._total_byte_count / transfer_time
# using base-10 units (see HACKING.txt).
msg = ('Transferred: %.0fkB'
' (%.1fkB/s r:%.0fkB w:%.0fkB'
% (self._total_byte_count / 1000.,
bps / 1000.,
self._bytes_by_direction['read'] / 1000.,
self._bytes_by_direction['write'] / 1000.,
))
if self._bytes_by_direction['unknown'] > 0:
msg += ' u:%.0fkB)' % (
self._bytes_by_direction['unknown'] / 1000.
)
else:
msg += ')'
return msg
def log_transport_activity(self, display=False):
msg = self._format_bytes_by_direction()
trace.mutter(msg)
if display and self._total_byte_count > 0:
self.clear()
self._term_file.write(msg + '\n')
class TextUIOutputStream(object):
"""Decorates an output stream so that the terminal is cleared before writing.
This is supposed to ensure that the progress bar does not conflict with bulk
text output.
"""
# XXX: this does not handle the case of writing part of a line, then doing
# progress bar output: the progress bar will probably write over it.
# one option is just to buffer that text until we have a full line;
# another is to save and restore it
# XXX: might need to wrap more methods
def __init__(self, ui_factory, wrapped_stream):
self.ui_factory = ui_factory
self.wrapped_stream = wrapped_stream
# this does no transcoding, but it must expose the underlying encoding
# because some callers need to know what can be written - see for
# example unescape_for_display.
self.encoding = getattr(wrapped_stream, 'encoding', None)
def flush(self):
self.ui_factory.clear_term()
self.wrapped_stream.flush()
def write(self, to_write):
self.ui_factory.clear_term()
self.wrapped_stream.write(to_write)
def writelines(self, lines):
self.ui_factory.clear_term()
self.wrapped_stream.writelines(lines)
|