/usr/lib/python2.7/dist-packages/Bcfg2/Server/Lint/Validate.py is in bcfg2-server 1.4.0~pre2+git141-g6d40dace6358-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 | """Validate XML files.
Ensure that all XML files in the Bcfg2 repository validate according
to their respective schemas.
"""
import glob
import os
import sys
import lxml.etree
import Bcfg2.Options
import Bcfg2.Server.Lint
from Bcfg2.Utils import Executor
class Validate(Bcfg2.Server.Lint.ServerlessPlugin):
""" Ensure that all XML files in the Bcfg2 repository validate
according to their respective schemas. """
options = Bcfg2.Server.Lint.ServerlessPlugin.options + [
Bcfg2.Options.PathOption(
"--schema", cf=("Validate", "schema"),
default="/usr/share/bcfg2/schemas",
help="The full path to the XML schema files")]
def __init__(self, *args, **kwargs):
Bcfg2.Server.Lint.ServerlessPlugin.__init__(self, *args, **kwargs)
#: A dict of <file glob>: <schema file> that maps files in the
#: Bcfg2 specification to their schemas. The globs are
#: extended :mod:`fnmatch` globs that also support ``**``,
#: which matches any number of any characters, including
#: forward slashes. The schema files are relative to the
#: schema directory, which can be controlled by the
#: ``bcfg2-lint --schema`` option.
self.filesets = \
{"Metadata/groups.xml": "metadata.xsd",
"Metadata/clients.xml": "clients.xsd",
"Cfg/**/info.xml": "info.xsd",
"Cfg/**/privkey.xml": "privkey.xsd",
"Cfg/**/pubkey.xml": "pubkey.xsd",
"Cfg/**/authorizedkeys.xml": "authorizedkeys.xsd",
"Cfg/**/authorized_keys.xml": "authorizedkeys.xsd",
"Cfg/**/sslcert.xml": "sslca-cert.xsd",
"Cfg/**/sslkey.xml": "sslca-key.xsd",
"SSHbase/**/info.xml": "info.xsd",
"TGenshi/**/info.xml": "info.xsd",
"TCheetah/**/info.xml": "info.xsd",
"Bundler/*.xml": "bundle.xsd",
"Bundler/*.genshi": "bundle.xsd",
"Pkgmgr/*.xml": "pkglist.xsd",
"Rules/*.xml": "rules.xsd",
"Defaults/*.xml": "defaults.xsd",
"etc/report-configuration.xml": "report-configuration.xsd",
"Deps/*.xml": "deps.xsd",
"Decisions/*.xml": "decisions.xsd",
"Packages/sources.xml": "packages.xsd",
"GroupPatterns/config.xml": "grouppatterns.xsd",
"AWSTags/config.xml": "awstags.xsd",
"NagiosGen/config.xml": "nagiosgen.xsd",
"FileProbes/config.xml": "fileprobes.xsd",
"GroupLogic/groups.xml": "grouplogic.xsd"
}
self.filelists = {}
self.get_filelists()
self.cmd = Executor()
def Run(self):
for path, schemaname in self.filesets.items():
try:
filelist = self.filelists[path]
except KeyError:
filelist = []
if filelist:
# avoid loading schemas for empty file lists
schemafile = os.path.join(Bcfg2.Options.setup.schema,
schemaname)
schema = self._load_schema(schemafile)
if schema:
for filename in filelist:
self.validate(filename, schemafile, schema=schema)
self.check_properties()
@classmethod
def Errors(cls):
return {"schema-failed-to-parse": "warning",
"properties-schema-not-found": "warning",
"xml-failed-to-parse": "error",
"xml-failed-to-read": "error",
"xml-failed-to-verify": "error",
"xinclude-does-not-exist": "error",
"input-output-error": "error"}
def check_properties(self):
""" Check Properties files against their schemas. """
for filename in self.filelists['props']:
schemafile = "%s.xsd" % os.path.splitext(filename)[0]
if os.path.exists(schemafile):
self.validate(filename, schemafile)
else:
self.LintError("properties-schema-not-found",
"No schema found for %s" % filename)
# ensure that it at least parses
self.parse(filename)
def parse(self, filename):
""" Parse an XML file, raising the appropriate LintErrors if
it can't be parsed or read. Return the
lxml.etree._ElementTree parsed from the file.
:param filename: The full path to the file to parse
:type filename: string
:returns: lxml.etree._ElementTree - the parsed data"""
try:
xdata = lxml.etree.parse(filename)
if self.files is None:
self._expand_wildcard_xincludes(xdata)
xdata.xinclude()
return xdata
except (lxml.etree.XIncludeError, SyntaxError):
cmd = ["xmllint", "--noout"]
if self.files is None:
cmd.append("--xinclude")
cmd.append(filename)
result = self.cmd.run(cmd)
self.LintError("xml-failed-to-parse",
"%s fails to parse:\n%s" %
(filename, result.stdout + result.stderr))
return False
except IOError:
self.LintError("xml-failed-to-read",
"Failed to open file %s" % filename)
return False
def _expand_wildcard_xincludes(self, xdata):
""" a lightweight version of
:func:`Bcfg2.Server.Plugin.helpers.XMLFileBacked._follow_xincludes` """
xinclude = '%sinclude' % Bcfg2.Server.XI_NAMESPACE
for el in xdata.findall('//' + xinclude):
name = el.get("href")
if name.startswith("/"):
fpath = name
else:
fpath = os.path.join(os.path.dirname(xdata.docinfo.URL), name)
# expand globs in xinclude, a bcfg2-specific extension
extras = glob.glob(fpath)
if not extras:
msg = "%s: %s does not exist, skipping: %s" % \
(xdata.docinfo.URL, name, self.RenderXML(el))
if el.findall('./%sfallback' % Bcfg2.Server.XI_NAMESPACE):
self.logger.debug(msg)
else:
self.LintError("xinclude-does-not-exist", msg)
parent = el.getparent()
parent.remove(el)
for extra in extras:
if extra != xdata.docinfo.URL:
lxml.etree.SubElement(parent, xinclude, href=extra)
def validate(self, filename, schemafile, schema=None):
""" Validate a file against the given schema.
:param filename: The full path to the file to validate
:type filename: string
:param schemafile: The full path to the schema file to
validate against
:type schemafile: string
:param schema: The loaded schema to validate against. This
can be used to avoid parsing a single schema
file for every file that needs to be validate
against it.
:type schema: lxml.etree.Schema
:returns: bool - True if the file validates, false otherwise
"""
if schema is None:
# if no schema object was provided, instantiate one
schema = self._load_schema(schemafile)
if not schema:
return False
datafile = self.parse(filename)
if not datafile:
return False
if not schema.validate(datafile):
cmd = ["xmllint"]
if self.files is None:
cmd.append("--xinclude")
cmd.extend(["--noout", "--schema", schemafile, filename])
result = self.cmd.run(cmd)
if not result.success:
self.LintError("xml-failed-to-verify",
"%s fails to verify:\n%s" %
(filename, result.stdout + result.stderr))
return False
return True
def get_filelists(self):
""" Get lists of different kinds of files to validate. This
doesn't return anything, but it sets
:attr:`Bcfg2.Server.Lint.Validate.Validate.filelists` to a
dict whose keys are path globs given in
:attr:`Bcfg2.Server.Lint.Validate.Validate.filesets` and whose
values are lists of the full paths to all files in the Bcfg2
repository (or given with ``bcfg2-lint --stdin``) that match
the glob."""
for path in self.filesets.keys():
if '/**/' in path:
if self.files is not None:
self.filelists[path] = self.list_matching_files(path)
else: # self.files is None
fpath, fname = path.split('/**/')
self.filelists[path] = []
for root, _, files in os.walk(
os.path.join(Bcfg2.Options.setup.repository,
fpath)):
self.filelists[path].extend([os.path.join(root, f)
for f in files
if f == fname])
else:
self.filelists[path] = self.list_matching_files(path)
self.filelists['props'] = self.list_matching_files("Properties/*.xml")
def _load_schema(self, filename):
""" Load an XML schema document, returning the Schema object
and raising appropriate lint errors on failure.
:param filename: The full path to the schema file to load.
:type filename: string
:returns: lxml.etree.Schema - The loaded schema data
"""
try:
return lxml.etree.XMLSchema(lxml.etree.parse(filename))
except IOError:
err = sys.exc_info()[1]
self.LintError("input-output-error", str(err))
except lxml.etree.XMLSchemaParseError:
err = sys.exc_info()[1]
self.LintError("schema-failed-to-parse",
"Failed to process schema %s: %s" %
(filename, err))
return None
|