/usr/lib/python3/dist-packages/provisioningserver/tags.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 | # Copyright 2012-2016 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Cluster-side evaluation of tags."""
__all__ = [
'merge_details',
'merge_details_cleanly',
'process_node_tags',
]
from collections import OrderedDict
from functools import partial
import http.client
import json
import urllib.error
import urllib.parse
import urllib.request
import bson
from lxml import etree
from provisioningserver.logger import get_maas_logger
from provisioningserver.utils import classify
from provisioningserver.utils.xpath import try_match_xpath
maaslog = get_maas_logger("tag_processing")
# An example laptop's lshw XML dump was 135kB. An example lab's LLDP
# XML dump was 1.6kB. A batch size of 100 would mean downloading ~14MB
# from the region controller, which seems workable. The previous batch
# size of 1000 would have resulted in a ~140MB download, which, on the
# face of it, appears excessive.
DEFAULT_BATCH_SIZE = 100
def process_response(response):
"""All responses should be httplib.OK.
The response should contain a BSON document (content-type
application/bson) or a JSON document (content-type application/json). If
so, the document will be decoded and the result returned, otherwise the
raw binary content will be returned.
:param response: The result of MAASClient.get/post/etc.
:type response: urllib.request.addinfourl (a file-like object that has a
.code attribute.)
"""
if response.code != http.client.OK:
text_status = http.client.responses.get(response.code, '<unknown>')
message = '%s, expected 200 OK' % text_status
raise urllib.error.HTTPError(
response.url, response.code, message,
response.headers, response.fp)
content = response.read()
content_type = response.headers.get_content_type()
if content_type == "application/bson":
return bson.BSON(content).decode()
elif content_type == "application/json":
content_charset = response.headers.get_content_charset()
return json.loads(content.decode(
"utf-8" if content_charset is None else content_charset))
else:
return content
def get_details_for_nodes(client, system_ids):
"""Retrieve details for a set of nodes.
:param client: MAAS client
:param system_ids: List of UUIDs of systems for which to fetch LLDP data
:return: Dictionary mapping node UUIDs to details, e.g. LLDP output
"""
details = {}
for system_id in system_ids:
path = '/api/2.0/nodes/%s/' % system_id
data = process_response(client.get(path, op='details'))
details[system_id] = data
return details
def post_updated_nodes(
client, rack_id, tag_name, tag_definition, added, removed):
"""Update the nodes relevant for a particular tag.
:param client: MAAS client
:param rack_id: System ID for rack controller
:param tag_name: Name of tag
:param tag_definition: Definition of the tag, used to assure that the work
being done matches the current value.
:param added: Set of nodes to add
:param removed: Set of nodes to remove
"""
path = '/api/2.0/tags/%s/' % (tag_name,)
maaslog.debug(
"Updating nodes for %s, adding %s removing %s"
% (tag_name, len(added), len(removed)))
try:
return process_response(client.post(
path, op='update_nodes', as_json=True,
rack_controller=rack_id, definition=tag_definition,
add=added, remove=removed))
except urllib.error.HTTPError as e:
if e.code == http.client.CONFLICT:
if e.fp is not None:
msg = e.fp.read()
else:
msg = e.msg
maaslog.info("Got a CONFLICT while updating tag: %s", msg)
return {}
raise
def _details_prepare_merge(details):
# We may mutate the details later, so copy now to prevent
# affecting the caller's data.
details = details.copy()
# Prepare an nsmap in an OrderedDict. This ensures that lxml
# serializes namespace declarations in a stable order.
nsmap = OrderedDict((ns, ns) for ns in sorted(details))
# Root everything in a namespace-less element. Setting the nsmap
# here ensures that prefixes are preserved when dumping later.
# This element will be replaced by the root of the lshw detail.
# However, if there is no lshw detail, this root element shares
# its tag with the tag of an lshw XML tree, so that XPath
# expressions written with the lshw tree in mind will still work
# without it, e.g. "/list//{lldp}something".
root = etree.Element("list", nsmap=nsmap)
# We have copied details, and root is new.
return details, root
def _details_make_backwards_compatible(details, root):
# For backward-compatibilty, if lshw details are available, these
# should form the root of the composite document.
xmldata = details.get("lshw")
if xmldata is not None:
try:
lshw = etree.fromstring(xmldata)
except etree.XMLSyntaxError as e:
maaslog.warning("Invalid lshw details: %s", e)
del details["lshw"] # Don't process again later.
else:
# We're throwing away the existing root, but we can adopt
# its nsmap by becoming its child.
root.append(lshw)
root = lshw
# We may have mutated details and root.
return details, root
def _details_do_merge(details, root):
# Merge the remaining details into the composite document.
for namespace in sorted(details):
xmldata = details[namespace]
if xmldata is not None:
try:
detail = etree.fromstring(xmldata)
except etree.XMLSyntaxError as e:
maaslog.warning("Invalid %s details: %s", namespace, e)
else:
# Add the namespace to all unqualified elements.
for elem in detail.iter("{}*"):
elem.tag = etree.QName(namespace, elem.tag)
root.append(detail)
# Re-home `root` in a new tree. This ensures that XPath
# expressions like "/some-tag" work correctly. Without this, when
# there's well-formed lshw data -- see the backward-compatibilty
# hack futher up -- expressions would be evaluated from the first
# root created in this function, even though that root is now the
# parent of the current `root`.
return etree.ElementTree(root)
def merge_details(details):
"""Merge node details into a single XML document.
`details` should be of the form::
{"name": xml-as-bytes, "name2": xml-as-bytes, ...}
where `name` is the namespace (and prefix) where each detail's XML
should be placed in the composite document; elements in each
detail document without a namespace are moved into that namespace.
The ``lshw`` detail is treated specially, purely for backwards
compatibility. If present, it forms the root of the composite
document, without any namespace changes, plus it will be included
in the composite document in the ``lshw`` namespace.
The returned document is always rooted with a ``list`` element.
"""
details, root = _details_prepare_merge(details)
details, root = _details_make_backwards_compatible(details, root)
return _details_do_merge(details, root)
def merge_details_cleanly(details):
"""Merge node details into a single XML document.
`details` should be of the form::
{"name": xml-as-bytes, "name2": xml-as-bytes, ...}
where `name` is the namespace (and prefix) where each detail's XML
should be placed in the composite document; elements in each
detail document without a namespace are moved into that namespace.
This is similar to `merge_details`, but the ``lshw`` detail is not
treated specially. The result of this function is not compatible
with XPath expressions created for old releases of MAAS.
The returned document is always rooted with a ``list`` element.
"""
details, root = _details_prepare_merge(details)
return _details_do_merge(details, root)
def gen_batch_slices(count, size):
"""Generate `slice`s to split `count` objects into batches.
The batches will be evenly distributed; no batch will differ in
length from any other by more than 1.
Note that the slices returned include a step. This means that
slicing a list with the aid of this function then concatenating
the results will not give you the same list. All the elements will
be present, but not in the same order.
:return: An iterator of `slice`s.
"""
batch_count, remaining = divmod(count, size)
batch_count += 1 if remaining > 0 else 0
for batch in range(batch_count):
yield slice(batch, None, batch_count)
def gen_batches(things, batch_size):
"""Split `things` into even batches of <= `batch_size`.
Note that batches are calculated by `get_batch_slices` which does
not guarantee ordering.
:type things: `list`, or anything else that can be sliced.
:return: An iterator of `slice`s of `things`.
"""
slices = gen_batch_slices(len(things), batch_size)
return (things[s] for s in slices)
def gen_node_details(client, batches):
"""Fetch node details.
This lazily fetches data in batches, but this detail is hidden
from callers.
:return: An iterator of ``(system-id, details-document)`` tuples.
"""
get_details = partial(get_details_for_nodes, client)
for batch in batches:
for system_id, details in get_details(batch).items():
yield system_id, merge_details(details)
def process_all(client, rack_id, tag_name, tag_definition, system_ids,
xpath, batch_size=None):
maaslog.debug(
"processing %d system_ids for tag %s.",
len(system_ids), tag_name)
if batch_size is None:
batch_size = DEFAULT_BATCH_SIZE
batches = gen_batches(system_ids, batch_size)
node_details = gen_node_details(client, batches)
nodes_matched, nodes_unmatched = classify(
partial(try_match_xpath, xpath, logger=maaslog), node_details)
post_updated_nodes(
client, rack_id, tag_name, tag_definition,
nodes_matched, nodes_unmatched)
def process_node_tags(
rack_id, nodes, tag_name, tag_definition, tag_nsmap,
client, batch_size=None):
"""Update the nodes for a new/changed tag definition.
:param rack_id: System ID for the rack controller.
:param nodes: List of nodes to process tags for.
:param client: A `MAASClient` used to fetch the node's details via
calls to the web API.
:param tag_name: Name of the tag to update nodes for
:param tag_definition: Tag definition
:param batch_size: Size of batch
"""
# We evaluate this early, so we can fail before sending a bunch of data to
# the server
xpath = etree.XPath(tag_definition, namespaces=tag_nsmap)
system_ids = [
node["system_id"]
for node in nodes
]
process_all(
client, rack_id, tag_name, tag_definition, system_ids, xpath,
batch_size=batch_size)
|