/usr/share/pyshared/pika/log.py is in python-pika 0.9.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | # ***** BEGIN LICENSE BLOCK *****
#
# For copyright and licensing please refer to COPYING.
#
# ***** END LICENSE BLOCK *****
try:
import curses
except ImportError:
curses = None
try:
from functools import wraps
except ImportError:
def wraps(method):
return method
import logging
import sys
class NullHandler(logging.Handler):
def emit(self, record):
pass
logger = logging.getLogger('pika')
logger.setLevel(logging.WARN)
logger.addHandler(NullHandler())
# Define these attributes as references to their logging counterparts
debug = logger.debug
error = logger.error
info = logger.info
warning = logger.warning
# Constants
DEBUG = logging.DEBUG
ERROR = logging.ERROR
INFO = logging.INFO
WARNING = logging.WARNING
# Default Level
LEVEL = INFO
def method_call(method):
"""
Logging decorator to send the method and arguments to logger.debug
"""
@wraps(method)
def debug_log(*args, **kwargs):
if logger.getEffectiveLevel() == DEBUG:
method_name = '%s.%s' % (method.__module__, method.__name__)
if args:
if str(type(args[0]))[1:6] == 'class':
offset = 1
else:
offset = 0
# Build a list of arguments to send to the logger
log_args = list()
for x in xrange(offset, len(args)):
log_args.append(args[x])
if len(kwargs) > 1:
log_args.append(kwargs)
# If we have arguments, log them as well
logger.debug("%s(%r) Called", method_name, log_args)
return method(*args, **kwargs)
logger.debug("%s() Called", method_name)
# Actually execute the method
return method(*args, **kwargs)
# Return the debug_log function to the python stack for execution
return debug_log
def setup(level=INFO, color=False):
"""
Setup Pika logging, useful for console debugging and logging pika info,
warning, and error messages.
Parameters:
- level: Logging level. One of DEBUG, ERROR, INFO, WARNING.
Default: INFO
- color: Use colorized output. Default: False
"""
global curses
if curses and not color:
curses = None
logging.basicConfig(level=level)
logging.getLogger('pika').setLevel(level)
if color and curses and sys.stderr.isatty():
# Setup Curses
curses.setupterm()
# If we have color support
if curses.tigetnum("colors") > 0:
# Assign a FormatOutput Formatter to a StreamHandler instance
for handler in logging.getLogger('').handlers:
if isinstance(handler, logging.StreamHandler):
handler.setFormatter(FormatOutput())
break
class FormatOutput(logging.Formatter):
"""
Creates a colorized output format for logging that helps provide easier
context in debugging
"""
def __init__(self, *args, **kwargs):
logging.Formatter.__init__(self, *args, **kwargs)
color = curses.tigetstr("setaf") or curses.tigetstr("setf") or ""
self._level = {DEBUG: curses.tparm(color, 6),
ERROR: curses.tparm(color, 1),
INFO: curses.tparm(color, 2),
WARNING: curses.tparm(color, 3)}
self._reset = curses.tigetstr("sgr0")
self._class = curses.tparm(color, 4)
self._args = curses.tparm(color, 2)
elements = ['%(levelname)-8s', '%(asctime)-24s', '#%(process)s']
self._prefix = '[%s]' % ' '.join(elements)
def format(self, record):
#timestamp = datetime.datetime.fromtimestamp(record.created)
#record.timestamp = timestamp.isoformat(' ')
message = record.getMessage()
record.asctime = self.formatTime(record)
if message[-6:] == 'Called':
message = message[5:]
x = message.find('(')
method_name = message[:x]
start_content = message.find('(') + 1
message = '%s%s%s(%s%s%s)' % (self._class,
method_name,
self._reset,
self._args,
message[start_content:-8],
self._reset)
output = "%s%s%s %s" % (self._level.get(record.levelno,
self._reset),
self._prefix % record.__dict__,
self._reset,
message)
if record.exc_info and not record.exc_text:
record.exc_text = self.formatException(record.exc_info)
if record.exc_text:
output = output.rstrip() + "\n " + record.exc_text
return output
|