/usr/share/pyshared/blogofile/filter.py is in blogofile 0.8b1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 | # -*- coding: utf-8 -*-
from __future__ import print_function
import sys
import os
import logging
import imp
import uuid
logger = logging.getLogger("blogofile.filter")
from .cache import bf
from .cache import HierarchicalCache
from . import exception
bf.filter = sys.modules['blogofile.filter']
default_filter_config = {"name": None,
"description": None,
"author": None,
"url": None}
def run_chain(chain, content):
"""Run content through a filter chain.
Works with either a string or a sequence of filters
"""
if chain is None:
return content
# lib3to2 interprets str as meaning unicode instead of basestring,
# hand craft the translation to python2:
if sys.version_info >= (3,):
is_str = eval("isinstance(chain, str)")
else:
is_str = eval("isinstance(chain, basestring)")
if is_str:
chain = parse_chain(chain)
for fn in chain:
f = get_filter(fn)
logger.debug("Applying filter: " + fn)
content = f.run(content)
logger.debug("Content: " + content)
return content
def parse_chain(chain):
"""Parse a filter chain into a sequence of filters.
"""
parts = []
for p in chain.split(","):
p = p.strip()
if p.lower() == "none":
continue
if len(p) > 0:
parts.append(p)
return parts
def preload_filters(namespace=None, directory="_filters"):
"""Find all the standalone .py files and modules in the directory
specified and load them into namespace specified.
"""
if namespace is None:
namespace = bf.config.filters
if(not os.path.isdir(directory)):
return
for fn in os.listdir(directory):
p = os.path.join(directory, fn)
if (os.path.isfile(p) and fn.endswith(".py")):
# Load a single .py file:
load_filter(fn[:-3], module_path=p, namespace=namespace)
elif (os.path.isdir(p)
and os.path.isfile(os.path.join(p, "__init__.py"))):
# Load a package:
load_filter(fn, module_path=p, namespace=namespace)
def init_filters(namespace=None):
"""Filters have an optional init method that runs before the site
is built.
"""
if namespace is None:
namespace = bf.config.filters
for name, filt in list(namespace.items()):
if "mod" in filt \
and type(filt.mod).__name__ == "module"\
and not filt.mod.__initialized:
try:
init_method = filt.mod.init
except AttributeError:
filt.mod.__initialized = True
continue
logger.debug("Initializing filter: " + name)
init_method()
filt.mod.__initialized = True
def get_filter(name, namespace=None):
"""Return an already loaded filter.
"""
if namespace is None:
if name.startswith("bf") and "." in name:
# Name is an absolute reference to a filter in a given
# namespace; extract the namespace
namespace, name = name.rsplit(".", 1)
namespace = eval(namespace)
else:
namespace = bf.config.filters
if name in namespace and "mod" in namespace[name]:
logger.debug("Retrieving already loaded filter: " + name)
return namespace[name]['mod']
else:
raise exception.FilterNotLoaded("Filter not loaded: {0}".format(name))
def load_filter(name, module_path, namespace=None):
"""Load a filter from the site's _filters directory.
"""
if namespace is None:
namespace = bf.config.filters
try:
initial_dont_write_bytecode = sys.dont_write_bytecode
except KeyError:
initial_dont_write_bytecode = False
try:
# Don't generate .pyc files in the _filters directory
sys.dont_write_bytecode = True
if module_path.endswith(".py"):
mod = imp.load_source(
"{0}_{1}".format(name, uuid.uuid4()), module_path)
else:
mod = imp.load_package(
"{0}_{1}".format(name, uuid.uuid4()), module_path)
logger.debug("Loaded filter for first time: {0}".format(module_path))
mod.__initialized = False
# Overwrite anything currently in this namespace:
try:
del namespace[name]
except KeyError:
pass
# If the filter defines it's own configuration, use that as
# it's own namespace:
if hasattr(mod, "config") and \
isinstance(mod.config, HierarchicalCache):
namespace[name] = mod.config
# Load the module into the namespace
namespace[name].mod = mod
# If the filter has any aliases, load those as well
try:
for alias in mod.config['aliases']:
namespace[alias] = namespace[name]
except:
pass
# Load the default blogofile config for filters:
for k, v in list(default_filter_config.items()):
namespace[name][k] = v
# Load any filter defined defaults:
try:
filter_config = getattr(mod, "config")
for k, v in list(filter_config.items()):
if "." in k:
# This is a hierarchical setting
tail = namespace[name]
parts = k.split(".")
for part in parts[:-1]:
tail = tail[part]
tail[parts[-1]] = v
else:
namespace[name][k] = v
except AttributeError:
pass
return mod
except:
logger.error("Cannot load filter: " + name)
raise
finally:
# Reset the original sys.dont_write_bytecode setting where we're done
sys.dont_write_bytecode = initial_dont_write_bytecode
def list_filters(args):
from . import config, plugin
config.init_interactive()
plugin.init_plugins()
# module path -> list of aliases
filters = {}
for name, filt in bf.config.filters.items():
if "mod" in filt:
aliases = filters.get(filt.mod.__file__, [])
aliases.append(name)
filters[filt.mod.__file__] = aliases
for mod_path, aliases in filters.items():
print("{0} - {1}\n".format(", ".join(aliases), mod_path))
|