/usr/lib/python2.7/dist-packages/jnpr/junos/factory/view.py is in python-junos-eznc 2.0.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 | import warnings
from contextlib import contextmanager
from copy import deepcopy
from lxml import etree
import json
import sys
from jnpr.junos.factory.viewfields import ViewFields
from jnpr.junos.factory.to_json import TableViewJSONEncoder
class View(object):
"""
View is the base-class that makes extracting values from XML
data appear as objects with attributes.
"""
ITEM_NAME_XPATH = 'name'
FIELDS = {}
GROUPS = None
# -------------------------------------------------------------------------
# CONSTRUCTOR
# -------------------------------------------------------------------------
def __init__(self, table, view_xml):
"""
:table:
instance of the RunstatTable
:view_xml:
this should be an lxml etree Elemenet object. This
constructor also accepts a list with a single item/XML
"""
# if as_xml is passed as a list, make sure it only has
# a single item, common response from an xpath search
if isinstance(view_xml, list):
if 1 == len(view_xml):
view_xml = view_xml[0]
else:
raise ValueError("constructor only accepts a single item")
# now ensure that the thing provided is an lxml etree Element
if not isinstance(view_xml, etree._Element):
raise ValueError("constructor only accecpts lxml.etree._Element")
self._table = table
self.ITEM_NAME_XPATH = table.ITEM_NAME_XPATH
self._init_xml(view_xml)
def _init_xml(self, given_xml):
self._xml = given_xml
if self.GROUPS is not None:
self._groups = {}
for xg_name, xg_xpath in self.GROUPS.items():
xg_xml = self._xml.xpath(xg_xpath)
# @@@ this is technically an error; need to trap it
if not len(xg_xml):
continue
self._groups[xg_name] = xg_xml[0]
# -------------------------------------------------------------------------
# PROPERTIES
# -------------------------------------------------------------------------
@property
def T(self):
""" return the Table instance for the View """
return self._table
@property
def D(self):
""" return the Device instance for this View """
return self.T.D
@property
def name(self):
""" return the name of view item """
if self.ITEM_NAME_XPATH is None:
return self._table.D.hostname
if isinstance(self.ITEM_NAME_XPATH, str):
# xpath union key
if ' | ' in self.ITEM_NAME_XPATH:
return self._xml.xpath(self.ITEM_NAME_XPATH)[0].text.strip()
# simple key
return self._xml.findtext(self.ITEM_NAME_XPATH).strip()
else:
# composite key
# keys with missing XPATH nodes are set to None
keys = []
for i in self.ITEM_NAME_XPATH:
try:
keys.append(self.xml.xpath(i)[0].text.strip())
except:
keys.append(None)
return tuple(keys)
# ALIAS key <=> name
key = name
@property
def xml(self):
""" returns the XML associated to the item """
return self._xml
# -------------------------------------------------------------------------
# METHODS
# -------------------------------------------------------------------------
def keys(self):
""" list of view keys, i.e. field names """
return self.FIELDS.keys()
def values(self):
""" list of view values """
return [getattr(self, field) for field in self.keys()]
def items(self):
""" list of tuple(key,value) """
return zip(self.keys(), self.values())
def _updater_instance(self, more):
""" called from extend """
if hasattr(more, 'fields'):
self.FIELDS = deepcopy(self.__class__.FIELDS)
self.FIELDS.update(more.fields.end)
if hasattr(more, 'groups'):
self.GROUPS = deepcopy(self.__class__.GROUPS)
self.GROUPS.update(more.groups)
def _updater_class(self, more):
""" called from extend """
if hasattr(more, 'fields'):
self.FIELDS.update(more.fields.end)
if hasattr(more, 'groups'):
self.GROUPS.update(more.groups)
@contextmanager
def updater(self, fields=True, groups=False, all=True, **kvargs):
"""
provide the ability for subclassing objects to extend the
definitions of the fields. this is implemented as a
context manager with the form called from the subclass
constructor:
with self.extend() as more:
more.fields = <dict>
more.groups = <dict> # optional
"""
# ---------------------------------------------------------------------
# create a new object class so we can attach stuff to it arbitrarily.
# then pass that object to the caller, yo!
# ---------------------------------------------------------------------
more = type('RunstatViewMore', (object,), {})()
if fields is True:
more.fields = ViewFields()
# ---------------------------------------------------------------------
# callback through context manager
# ---------------------------------------------------------------------
yield more
updater = self._updater_class if all is True else \
self._updater_instance
updater(more)
def asview(self, view_cls):
""" create a new View object for this item """
return view_cls(self._table, self._xml)
def refresh(self):
"""
~~~ EXPERIMENTAL ~~~
refresh the data from the Junos device. this only works if the table
provides an "args_key", does not update the original table, just this
specific view/item
"""
warnings.warn("Experimental method: refresh")
if self._table.can_refresh is not True:
raise RuntimeError("table does not support this feature")
# create a new table instance that gets only the specific named
# value of this view
tbl_xml = self._table._rpc_get(self.name)
new_xml = tbl_xml.xpath(self._table.ITEM_XPATH)[0]
self._init_xml(new_xml)
return self
def to_json(self):
"""
:returns: JSON encoded string of entire View contents
"""
return json.dumps(self, cls=TableViewJSONEncoder)
# -------------------------------------------------------------------------
# OVERLOADS
# -------------------------------------------------------------------------
def __repr__(self):
""" returns the name of the View with the associate item name """
return "%s:%s" % (self.__class__.__name__, self.name)
def __getattr__(self, name):
"""
returns a view item value, called as :obj.name:
"""
item = self.FIELDS.get(name)
if item is None:
raise ValueError("Unknown field: '%s'" % name)
if 'table' in item:
# if this is a sub-table, then return that now
return item['table'](self.D, self._xml)
# otherwise, not a sub-table, and handle the field
astype = item.get('astype', str)
if 'group' in item:
if item['group'] in self._groups:
found = self._groups[item['group']].xpath(item['xpath'])
else:
return
else:
found = self._xml.xpath(item['xpath'])
len_found = len(found)
if astype is bool:
# handle the boolean flag case separately
return bool(len_found)
if not len_found:
# even for the case of numbers, do not set the value. we
# want to detect "does not exist" vs. defaulting to 0
# -- 2013-nov-19, JLS.
return None
try:
# added exception handler to catch malformed xpath expressesion
# -- 2013-nov-19, JLS.
# added support to handle multiple xpath values, i.e. a list of
# things that have the same xpath expression (common in configs)
# -- 2031-dec-06, JLS
# added support to use the element tag if the text is empty
def _munch(x):
if sys.version<'3':
as_str = x if isinstance(x, str) else x.text
if isinstance(as_str, unicode):
as_str = as_str.encode('ascii','replace')
else:
as_str = x if isinstance(x, str) else x.text
if as_str is not None:
as_str = as_str.strip()
if not as_str:
as_str = x.tag # use 'not' to test for empty
return astype(as_str)
if 1 == len_found:
return _munch(found[0])
return [_munch(this) for this in found]
except:
raise RuntimeError("Unable to handle field:'%s'" % name)
# and if we are here, then we didn't handle the field.
raise RuntimeError("Unable to handle field:'%s'" % name)
def __getitem__(self, name):
"""
allow the caller to extract field values using :obj['name']:
the same way they would do :obj.name:
"""
return getattr(self, name)
|