/usr/share/pyshared/provisioningserver/tags.py is in python-maas-provisioningserver 1.2+bzr1373+dfsg-0ubuntu1~12.04.6.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | # Copyright 2012 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""Celery jobs for managing tags.
"""
from __future__ import (
absolute_import,
print_function,
unicode_literals,
)
__metaclass__ = type
__all__ = [
'MissingCredentials',
'process_node_tags',
]
import httplib
from logging import getLogger
import urllib2
from apiclient.maas_client import (
MAASClient,
MAASDispatcher,
MAASOAuth,
)
from lxml import etree
from provisioningserver.auth import (
get_recorded_api_credentials,
get_recorded_nodegroup_uuid,
)
from provisioningserver.cluster_config import get_maas_url
import simplejson as json
logger = getLogger(__name__)
class MissingCredentials(Exception):
"""The MAAS URL or credentials are not yet set."""
DEFAULT_BATCH_SIZE = 1000
def get_cached_knowledge():
"""Get all the information that we need to know, or raise an error.
:return: (client, nodegroup_uuid)
"""
api_credentials = get_recorded_api_credentials()
if api_credentials is None:
logger.error("Not updating tags: don't have API key yet.")
return None, None
nodegroup_uuid = get_recorded_nodegroup_uuid()
if nodegroup_uuid is None:
logger.error("Not updating tags: don't have UUID yet.")
return None, None
client = MAASClient(MAASOAuth(*api_credentials), MAASDispatcher(),
get_maas_url())
return client, nodegroup_uuid
def process_response(response):
"""All responses should be httplib.OK and contain JSON content.
:param response: The result of MAASClient.get/post/etc.
:type response: urllib2.addinfourl (a file-like object that has a .code
attribute.)
"""
if response.code != httplib.OK:
text_status = httplib.responses.get(response.code, '<unknown>')
raise AssertionError('Unexpected HTTP status: %s %s, expected 200 OK'
% (response.code, text_status))
return json.loads(response.read())
def get_nodes_for_node_group(client, nodegroup_uuid):
"""Retrieve the UUIDs of nodes in a particular group.
:param client: MAAS client instance
:param nodegroup_uuid: Node group for which to retrieve nodes
:return: List of UUIDs for nodes in nodegroup
"""
path = '/api/1.0/nodegroups/%s/' % (nodegroup_uuid)
return process_response(client.get(path, op='list_nodes'))
def get_hardware_details_for_nodes(client, nodegroup_uuid, system_ids):
"""Retrieve the lshw output for a set of nodes.
:param client: MAAS client
:param system_ids: List of UUIDs of systems for which to fetch lshw data
:return: Dictionary mapping node UUIDs to lshw output
"""
path = '/api/1.0/nodegroups/%s/' % (nodegroup_uuid,)
return process_response(client.post(
path, op='node_hardware_details', as_json=True, system_ids=system_ids))
def post_updated_nodes(client, tag_name, tag_definition, uuid, added, removed):
"""Update the nodes relevant for a particular tag.
:param client: MAAS client
:param tag_name: Name of tag
:param tag_definition: Definition of the tag, used to assure that the work
being done matches the current value.
:param uuid: NodeGroup uuid of this worker. Needed for security
permissions. (The nodegroup worker is only allowed to touch nodes in
its nodegroup, otherwise you need to be a superuser.)
:param added: Set of nodes to add
:param removed: Set of nodes to remove
"""
path = '/api/1.0/tags/%s/' % (tag_name,)
logger.debug(
"Updating nodes for %s %s, adding %s removing %s"
% (tag_name, uuid, len(added), len(removed)))
try:
return process_response(client.post(
path, op='update_nodes', as_json=True, nodegroup=uuid,
definition=tag_definition, add=added, remove=removed))
except urllib2.HTTPError as e:
if e.code == httplib.CONFLICT:
if e.fp is not None:
msg = e.fp.read()
else:
msg = e.msg
logger.info("Got a CONFLICT while updating tag: %s", msg)
return {}
raise
def process_batch(xpath, hardware_details):
"""Get the details for one batch, and process whether they match or not.
"""
# Fetch node XML in batches
matched_nodes = []
unmatched_nodes = []
for system_id, hw_xml in hardware_details:
matched = False
if hw_xml is not None:
try:
xml = etree.XML(hw_xml)
except etree.XMLSyntaxError as e:
logger.debug(
"Invalid hardware_details for %s: %s" % (system_id, e))
else:
if xpath(xml):
matched = True
if matched:
matched_nodes.append(system_id)
else:
unmatched_nodes.append(system_id)
return matched_nodes, unmatched_nodes
def process_all(client, tag_name, tag_definition, nodegroup_uuid, system_ids, xpath,
batch_size=None):
if batch_size is None:
batch_size = DEFAULT_BATCH_SIZE
all_matched = []
all_unmatched = []
logger.debug(
"processing %d system_ids for tag %s nodegroup %s"
% (len(system_ids), tag_name, nodegroup_uuid))
for i in range(0, len(system_ids), batch_size):
selected_ids = system_ids[i:i + batch_size]
details = get_hardware_details_for_nodes(
client, nodegroup_uuid, selected_ids)
matched, unmatched = process_batch(xpath, details)
logger.debug(
"processing batch of %d ids received %d details"
" (%d matched, %d unmatched)"
% (len(selected_ids), len(details), len(matched), len(unmatched)))
all_matched.extend(matched)
all_unmatched.extend(unmatched)
# Upload all updates for one nodegroup at one time. This should be no more
# than ~41*10,000 = 410kB. That should take <1s even on a 10Mbit network.
# This also allows us to track if a nodegroup has been processed in the DB,
# without having to add another API call.
post_updated_nodes(
client, tag_name, tag_definition, nodegroup_uuid, all_matched, all_unmatched)
def process_node_tags(tag_name, tag_definition, batch_size=None):
"""Update the nodes for a new/changed tag definition.
:param tag_name: Name of the tag to update nodes for
:param tag_definition: Tag definition
:param batch_size: Size of batch
"""
client, nodegroup_uuid = get_cached_knowledge()
if not all([client, nodegroup_uuid]):
logger.error(
"Unable to update tag: %s for definition %r. "
"Please refresh secrets, then rebuild this tag."
% (tag_name, tag_definition))
raise MissingCredentials()
# We evaluate this early, so we can fail before sending a bunch of data to
# the server
xpath = etree.XPath(tag_definition)
# Get nodes to process
system_ids = get_nodes_for_node_group(client, nodegroup_uuid)
process_all(client, tag_name, tag_definition, nodegroup_uuid, system_ids, xpath,
batch_size=batch_size)
|