/usr/lib/python2.7/dist-packages/jnpr/junos/device.py is in python-junos-eznc 2.0.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 | # stdlib
import os
import types
import platform
import warnings
import traceback
# stdlib, in support of the the 'probe' method
import socket
import datetime
import time
import sys
import json
import re
# 3rd-party packages
from lxml import etree
from ncclient import manager as netconf_ssh
import ncclient.transport.errors as NcErrors
import ncclient.operations.errors as NcOpErrors
from ncclient.operations import RPCError
import paramiko
import jinja2
# local modules
from jnpr.junos.rpcmeta import _RpcMetaExec
from jnpr.junos import exception as EzErrors
from jnpr.junos.facts import *
from jnpr.junos import jxml as JXML
from jnpr.junos.decorators import timeoutDecorator, normalizeDecorator
_MODULEPATH = os.path.dirname(__file__)
class _MyTemplateLoader(jinja2.BaseLoader):
"""
Create a jinja2 template loader class that can be used to
load templates from all over the filesystem, but defaults
to the CWD and the 'templates' directory of the module
"""
def __init__(self):
self.paths = ['.', os.path.join(_MODULEPATH, 'templates')]
def get_source(self, environment, template):
def _in_path(dir):
return os.path.exists(os.path.join(dir, template))
path = list(filter(_in_path, self.paths))
if not path:
raise jinja2.TemplateNotFound(template)
path = os.path.join(path[0], template)
mtime = os.path.getmtime(path)
with open(path) as f:
# You are trying to decode an object that is already decoded.
# You have a str, there is no need to decode from UTF-8 anymore.
# open already decodes to Unicode in Python 3 if you open in text mode.
# If you want to open it as bytes, so that you can then decode,
# you need to open with mode 'rb'.
source = f.read()
return source, path, lambda: mtime == os.path.getmtime(path)
_Jinja2ldr = jinja2.Environment(loader=_MyTemplateLoader())
class _Connection(object):
# ------------------------------------------------------------------------
# property: hostname
# ------------------------------------------------------------------------
@property
def hostname(self):
"""
:returns: the host-name of the Junos device.
"""
return self._hostname if (
self._hostname != 'localhost') else self.facts.get('hostname')
# ------------------------------------------------------------------------
# property: user
# ------------------------------------------------------------------------
@property
def user(self):
"""
:returns: the login user (str) accessing the Junos device
"""
return self._auth_user
# ------------------------------------------------------------------------
# property: password
# ------------------------------------------------------------------------
@property
def password(self):
"""
:returns: ``None`` - do not provide the password
"""
return None # read-only
@password.setter
def password(self, value):
"""
Change the authentication password value. This is handy in case
the calling program needs to attempt different passwords.
"""
self._auth_password = value
# ------------------------------------------------------------------------
# property: logfile
# ------------------------------------------------------------------------
@property
def logfile(self):
"""
:returns: exsiting logfile ``file`` object.
"""
return self._logfile
@logfile.setter
def logfile(self, value):
"""
Assigns an opened file object to the device for logging
If there is an open logfile, and 'value' is ``None`` or ``False``
then close the existing file.
:param file value: An open ``file`` object.
:returns: the new logfile ``file`` object
:raises ValueError:
When **value** is not a ``file`` object
"""
# got an existing file that we need to close
if (not value) and (None != self._logfile):
rc = self._logfile.close()
self._logfile = False
return rc
if sys.version < '3':
if not isinstance(value, file):
raise ValueError("value must be a file object")
else:
import io
if not isinstance(value, io.TextIOWrapper):
raise ValueError("value must be a file object")
self._logfile = value
return self._logfile
# ------------------------------------------------------------------------
# property: timeout
# ------------------------------------------------------------------------
@property
def timeout(self):
"""
:returns: the current RPC timeout value (int) in seconds.
"""
return self._conn.timeout
@timeout.setter
def timeout(self, value):
"""
Used to change the RPC timeout value (default=30 sec).
:param int value:
New timeout value in seconds
"""
self._conn.timeout = value
# ------------------------------------------------------------------------
# property: facts
# ------------------------------------------------------------------------
@property
def facts(self):
"""
:returns: Device fact dictionary
"""
return self._facts
@facts.setter
def facts(self, value):
""" read-only property """
raise RuntimeError("facts is read-only!")
# ------------------------------------------------------------------------
# property: port
# ------------------------------------------------------------------------
@property
def port(self):
"""
:returns: the port (str) to connect to the Junos device
"""
return self._port
def _sshconf_lkup(self):
if self._ssh_config:
sshconf_path = os.path.expanduser(self._ssh_config)
else:
home = os.getenv('HOME')
if not home:
return None
sshconf_path = os.path.join(os.getenv('HOME'), '.ssh/config')
if not os.path.exists(sshconf_path):
return None
else:
sshconf = paramiko.SSHConfig()
with open(sshconf_path, 'r') as fp:
sshconf.parse(fp)
found = sshconf.lookup(self._hostname)
self._hostname = found.get('hostname', self._hostname)
self._port = found.get('port', self._port)
self._conf_auth_user = found.get('user')
self._conf_ssh_private_key_file = found.get('identityfile')
return sshconf_path
def display_xml_rpc(self, command, format='xml'):
"""
Executes the CLI command and returns the CLI xml object by default.
For example::
print dev.display_xml_rpc('show version').tag
or
print dev.display_xml_rpc('show version', format='text')
:param str command:
The CLI command to retrieve XML RPC for, e.g. "show version"
:param str format:
The return format, by default is XML. You can optionally select
"text" to return the XML structure as a string.
"""
try:
command = command + '| display xml rpc'
rsp = self.rpc.cli(command)
if format == 'text':
encode = None if sys.version < '3' else 'unicode'
return etree.tostring(rsp[0], encoding=encode)
return rsp[0]
except:
return "invalid command: " + command
# ------------------------------------------------------------------------
# Template: retrieves a Jinja2 template
# ------------------------------------------------------------------------
def Template(self, filename, parent=None, gvars=None):
"""
Used to return a Jinja2 :class:`Template`.
:param str filename:
file-path to Jinja2 template file on local device
:returns: Jinja2 :class:`Template` give **filename**.
"""
return self._j2ldr.get_template(filename, parent, gvars)
# ------------------------------------------------------------------------
# property: manages
# ------------------------------------------------------------------------
@property
def manages(self):
"""
:returns:
``list`` of Resource Managers/Utilities attached to this
instance using the :meth:`bind` method.
"""
return self._manages
# ------------------------------------------------------------------------
# dealing with bind aspects
# ------------------------------------------------------------------------
def bind(self, *vargs, **kvargs):
"""
Used to attach things to this Device instance and make them a
property of the :class:Device instance. The most common use
for bind is attaching Utility instances to a :class:Device.
For example::
from jnpr.junos.utils.config import Config
dev.bind( cu=Config )
dev.cu.lock()
# ... load some changes
dev.cu.commit()
dev.cu.unlock()
:param list vargs:
A list of functions that will get bound as instance methods to
this Device instance.
.. warning:: Experimental.
:param new_property:
name/class pairs that will create resource-managers bound as
instance attributes to this Device instance. See code example above
"""
if len(vargs):
for fn in vargs:
# check for name clashes before binding
if hasattr(self, fn.__name__):
raise ValueError(
"request attribute name %s already exists" %
fn.__name__)
for fn in vargs:
# bind as instance method, majik.
if sys.version < '3':
self.__dict__[
fn.__name__] = types.MethodType(
fn,
self,
self.__class__)
else:
self.__dict__[
fn.__name__] = types.MethodType(
fn,
self.__class__)
return
# first verify that the names do not conflict with
# existing object attribute names
for name in kvargs.keys():
# check for name-clashes before binding
if hasattr(self, name):
raise ValueError(
"requested attribute name %s already exists" %
name)
# now instantiate items and bind to this :Device:
for name, thing in kvargs.items():
new_inst = thing(self)
self.__dict__[name] = new_inst
self._manages.append(name)
@property
def _sshconf_path(self):
return self._sshconf_lkup()
# ------------------------------------------------------------------------
# probe
# ------------------------------------------------------------------------
def probe(self, timeout=5, intvtimeout=1):
"""
Probe the device to determine if the Device can accept a remote
connection.
This method is meant to be called *prior* to :open():
This method will not work with ssh-jumphost environments.
:param int timeout:
The probe will report ``True``/``False`` if the device report
connectivity within this timeout (seconds)
:param int intvtimeout:
Timeout interval on the socket connection. Generally you should not
change this value, but you can if you want to twiddle the frequency
of the socket attempts on the connection
:returns: ``True`` if probe is successful, ``False`` otherwise
"""
start = datetime.datetime.now()
end = start + datetime.timedelta(seconds=timeout)
probe_ok = True
while datetime.datetime.now() < end:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(intvtimeout)
try:
s.connect((self.hostname, int(self._port)))
s.shutdown(socket.SHUT_RDWR)
s.close()
break
except:
time.sleep(1)
pass
else:
elapsed = datetime.datetime.now() - start
probe_ok = False
return probe_ok
# ------------------------------------------------------------------------
# cli - for cheating commands :-)
# ------------------------------------------------------------------------
def cli(self, command, format='text', warning=True):
"""
Executes the CLI command and returns the CLI text output by default.
:param str command:
The CLI command to execute, e.g. "show version"
:param str format:
The return format, by default is text. You can optionally select
"xml" to return the XML structure.
.. note::
You can also use this method to obtain the XML RPC command for a
given CLI command by using the pipe filter ``| display xml rpc``.
When you do this, the return value is the XML RPC command. For
example if you provide as the command ``show version | display xml rpc``,
you will get back the XML Element ``<get-software-information>``.
.. warning::
This function is provided for **DEBUG** purposes only!
**DO NOT** use this method for general automation purposes as
that puts you in the realm of "screen-scraping the CLI". The purpose of
the PyEZ framework is to migrate away from that tooling pattern.
Interaction with the device should be done via the RPC function.
.. warning::
You cannot use "pipe" filters with **command** such as ``| match``
or ``| count``, etc. The only value use of the "pipe" is for the
``| display xml rpc`` as noted above.
"""
if 'display xml rpc' not in command and warning is True:
warnings.simplefilter("always")
warnings.warn("CLI command is for debug use only!", RuntimeWarning)
warnings.resetwarnings()
try:
rsp = self.rpc.cli(command, format)
if isinstance(rsp, dict) and format.lower() == 'json':
return rsp
# rsp returned True means <rpc-reply> is empty, hence return
# empty str as would be the case on cli
# ex:
# <rpc-reply message-id="urn:uuid:281f624f-022b-11e6-bfa8">
# </rpc-reply>
if rsp is True:
return ''
if rsp.tag in ['output', 'rpc-reply']:
encode = None if sys.version < '3' else 'unicode'
return etree.tostring(rsp, method="text", with_tail=False,
encoding=encode)
if rsp.tag == 'configuration-information':
return rsp.findtext('configuration-output')
if rsp.tag == 'rpc':
return rsp[0]
return rsp
except EzErrors.RpcError as ex:
if str(ex) is not '':
return "%s: %s" % (str(ex), command)
else:
return "invalid command: " + command
except Exception as ex:
return "invalid command: " + command
# ------------------------------------------------------------------------
# execute
# ------------------------------------------------------------------------
@normalizeDecorator
@timeoutDecorator
def execute(self, rpc_cmd, **kvargs):
"""
Executes an XML RPC and returns results as either XML or native python
:param rpc_cmd:
can either be an XML Element or xml-as-string. In either case
the command starts with the specific command element, i.e., not the
<rpc> element itself
:param func to_py:
Is a caller provided function that takes the response and
will convert the results to native python types. all kvargs
will be passed to this function as well in the form::
to_py( self, rpc_rsp, **kvargs )
:raises ValueError:
When the **rpc_cmd** is of unknown origin
:raises PermissionError:
When the requested RPC command is not allowed due to
user-auth class privilege controls on Junos
:raises RpcError:
When an ``rpc-error`` element is contained in the RPC-reply
:returns:
RPC-reply as XML object. If **to_py** is provided, then
that function is called, and return of that function is
provided back to the caller; presumably to convert the XML to
native python data-types (e.g. ``dict``).
"""
if self.connected is not True:
raise EzErrors.ConnectClosedError(self)
if isinstance(rpc_cmd, str):
rpc_cmd_e = etree.XML(rpc_cmd)
elif isinstance(rpc_cmd, etree._Element):
rpc_cmd_e = rpc_cmd
else:
raise ValueError(
"Dont know what to do with rpc of type %s" %
rpc_cmd.__class__.__name__)
# invoking a bad RPC will cause a connection object exception
# will will be raised directly to the caller ... for now ...
# @@@ need to trap this and re-raise accordingly.
try:
rpc_rsp_e = self._rpc_reply(rpc_cmd_e)
except NcOpErrors.TimeoutExpiredError:
# err is a TimeoutExpiredError from ncclient,
# which has no such attribute as xml.
raise EzErrors.RpcTimeoutError(self, rpc_cmd_e.tag, self.timeout)
except NcErrors.TransportError:
raise EzErrors.ConnectClosedError(self)
except RPCError as err:
rsp = JXML.remove_namespaces(err.xml)
# see if this is a permission error
e = EzErrors.PermissionError if rsp.findtext('error-message') == \
'permission denied' \
else EzErrors.RpcError
raise e(cmd=rpc_cmd_e, rsp=rsp, errs=err)
# Something unexpected happened - raise it up
except Exception as err:
warnings.warn("An unknown exception occured - please report.",
RuntimeWarning)
raise
# From 14.2 onward, junos supports JSON, so now code can be written as
# dev.rpc.get_route_engine_information({'format': 'json'})
if rpc_cmd_e.attrib.get('format') in ['json', 'JSON']:
if self._facts == {}:
self.facts_refresh()
ver_info = self._facts['version_info']
if ver_info.major[0] >= 15 or \
(ver_info.major[0] == 14 and ver_info.major[1] >= 2):
try:
return json.loads(rpc_rsp_e.text)
except ValueError as ex:
# when data is {}{.*} types
if str(ex).startswith('Extra data'):
return json.loads(
re.sub('\s?{\s?}\s?', '', rpc_rsp_e.text))
else:
warnings.warn("Native JSON support is only from 14.2 onwards",
RuntimeWarning)
# This section is here for the possible use of something other than ncclient
# for RPCs that have embedded rpc-errors, need to check for those now
# rpc_errs = rpc_rsp_e.xpath('.//rpc-error')
# if len(rpc_errs):
# raise EzErrors.RpcError(cmd=rpc_cmd_e, rsp=rpc_errs[0])
# skip the <rpc-reply> element and pass the caller first child element
# generally speaking this is what they really want. If they want to
# uplevel they can always call the getparent() method on it.
try:
ret_rpc_rsp = rpc_rsp_e[0]
except IndexError:
# For cases where reply are like
# <rpc-reply>
# protocol: operation-failed
# error: device asdf not found
# </rpc-reply>
if rpc_rsp_e.text is not None and rpc_rsp_e.text.strip() is not '':
return rpc_rsp_e
# no children, so assume it means we are OK
return True
# if the caller provided a "to Python" conversion function, then invoke
# that now and return the results of that function. otherwise just
# return the RPC results as XML
if kvargs.get('to_py'):
return kvargs['to_py'](self, ret_rpc_rsp, **kvargs)
else:
return ret_rpc_rsp
# ------------------------------------------------------------------------
# facts
# ------------------------------------------------------------------------
def facts_refresh(self, exception_on_failure=False):
"""
Reload the facts from the Junos device into :attr:`facts` property.
:param bool exception_on_failure: To raise exception or warning when
facts gathering errors out.
"""
for gather in FACT_LIST:
try:
gather(self, self._facts)
except:
if exception_on_failure:
raise
warnings.warn('Facts gathering is incomplete. '
'To know the reason call "dev.facts_refresh(exception_on_failure=True)"',
RuntimeWarning)
return
# -----------------------------------------------------------------------
# OVERLOADS
# -----------------------------------------------------------------------
def __repr__(self):
return "Device(%s)" % self.hostname
class Device(_Connection):
"""
Junos Device class.
:attr:`ON_JUNOS`:
**READ-ONLY** -
Auto-set to ``True`` when this code is running on a Junos device,
vs. running on a local-server remotely connecting to a device.
:attr:`auto_probe`:
When non-zero the call to :meth:`open` will probe for NETCONF
reachability before proceeding with the NETCONF session establishment.
If you want to enable this behavior by default, you could do the
following in your code::
from jnpr.junos import Device
# set all device open to auto-probe with timeout of 10 sec
Device.auto_probe = 10
dev = Device( ... )
dev.open() # this will probe before attempting NETCONF connect
"""
ON_JUNOS = platform.system().upper() == 'JUNOS' or \
platform.release().startswith('JNPR')
auto_probe = 0 # default is no auto-probe
# -------------------------------------------------------------------------
# PROPERTIES
# -------------------------------------------------------------------------
# ------------------------------------------------------------------------
# property: transform
# ------------------------------------------------------------------------
@property
def transform(self):
"""
:returns: the current RPC XML Transformation.
"""
return self._conn._device_handler.transform_reply
@transform.setter
def transform(self, func):
"""
Used to change the RPC XML Transformation.
:param lambda value:
New transform lambda
"""
self._conn._device_handler.transform_reply = func
# -----------------------------------------------------------------------
# CONSTRUCTOR
# -----------------------------------------------------------------------
def __new__(cls, *args, **kwargs):
if kwargs.get('port') in [23, '23'] or kwargs.get('mode'):
from jnpr.junos.console import Console
instance = object.__new__(Console, *args, **kwargs)
# Python only calls __init__() if the object returned from
# __new__() is an instance of the class in which the __new__()
# method is contained (here Device class). Hence calling __init__
# explicitly.
kwargs['host'] = args[0] if len(args) else kwargs.get('host')
instance.__init__(**kwargs)
return instance
else:
if sys.version < '3':
return super(Device, cls).__new__(cls, *args, **kwargs)
else:
return super().__new__(cls)
def __init__(self, *vargs, **kvargs):
"""
Device object constructor.
:param str vargs[0]: host-name or ipaddress. This is an
alternative for **host**
:param str host:
**REQUIRED** host-name or ipaddress of target device
:param str user:
*OPTIONAL* login user-name, uses $USER if not provided
:param str passwd:
*OPTIONAL* if not provided, assumed ssh-keys are enforced
:param int port:
*OPTIONAL* NETCONF port (defaults to 830)
:param bool gather_facts:
*OPTIONAL* For ssh mode default is ``True``. In case of console
connection over telnet/serial it defaults to ``False``.
If ``False`` then facts are not gathered on call to :meth:`open`
:param str mode:
*OPTIONAL* mode, mode for console connection (telnet/serial)
:param int baud:
*OPTIONAL* baud, Used during serial console mode, default baud
rate is 9600
:param int attempts:
*OPTIONAL* attempts, for console connection. default is 10
:param bool auto_probe:
*OPTIONAL* if non-zero then this enables auto_probe at time of
:meth:`open` and defines the amount of time(sec) for the
probe timeout
:param str ssh_private_key_file:
*OPTIONAL* The path to the SSH private key file.
This can be used if you need to provide a private key rather than
loading the key into the ssh-key-ring/environment. if your
ssh-key requires a password, then you must provide it via
**passwd**
:param str ssh_config:
*OPTIONAL* The path to the SSH configuration file.
This can be used to load SSH information from a configuration file.
By default ~/.ssh/config is queried.
:param bool normalize:
*OPTIONAL* default is ``False``. If ``True`` then the
XML returned by :meth:`execute` will have whitespace normalized
"""
# ----------------------------------------
# setup instance connection/open variables
# ----------------------------------------
hostname = vargs[0] if len(vargs) else kvargs.get('host')
self._port = kvargs.get('port', 830)
self._gather_facts = kvargs.get('gather_facts', True)
self._normalize = kvargs.get('normalize', False)
self._auto_probe = kvargs.get('auto_probe', self.__class__.auto_probe)
if self.__class__.ON_JUNOS is True and hostname is None:
# ---------------------------------
# running on a Junos device locally
# ---------------------------------
self._auth_user = None
self._auth_password = None
self._hostname = 'localhost'
self._ssh_private_key_file = None
self._ssh_config = None
else:
# --------------------------
# making a remote connection
# --------------------------
if hostname is None:
raise ValueError("You must provide the 'host' value")
self._hostname = hostname
# user will default to $USER
self._auth_user = os.getenv('USER')
self._conf_auth_user = None
self._conf_ssh_private_key_file = None
# user can get updated by ssh_config
self._ssh_config = kvargs.get('ssh_config')
# but if user or private key is explicit from call, then use it.
self._auth_user = kvargs.get('user') or self._conf_auth_user or \
self._auth_user
self._ssh_private_key_file = kvargs.get('ssh_private_key_file') \
or self._conf_ssh_private_key_file
self._auth_password = kvargs.get(
'password') or kvargs.get('passwd')
# -----------------------------
# initialize instance variables
# ------------------------------
self._conn = None
self._j2ldr = _Jinja2ldr
self._manages = []
self._facts = {}
# public attributes
self.connected = False
self.rpc = _RpcMetaExec(self)
# -----------------------------------------------------------------------
# Basic device methods
# -----------------------------------------------------------------------
def open(self, *vargs, **kvargs):
"""
Opens a connection to the device using existing login/auth
information.
:param bool gather_facts:
If set to ``True``/``False`` will override the device
instance value for only this open process
:param bool auto_probe:
If non-zero then this enables auto_probe and defines the amount
of time/seconds for the probe timeout
:param bool normalize:
If set to ``True``/``False`` will override the device
instance value for only this open process
:returns Device: Device instance (*self*).
:raises ProbeError:
When **auto_probe** is ``True`` and the probe activity
exceeds the timeout
:raises ConnectAuthError:
When provided authentication credentials fail to login
:raises ConnectRefusedError:
When the device does not have NETCONF enabled
:raises ConnectTimeoutError:
When the the :meth:`Device.timeout` value is exceeded
during the attempt to connect to the remote device
:raises ConnectError:
When an error, other than the above, occurs. The
originating ``Exception`` is assigned as ``err._orig``
and re-raised to the caller.
"""
auto_probe = kvargs.get('auto_probe', self._auto_probe)
if auto_probe is not 0:
if not self.probe(auto_probe):
raise EzErrors.ProbeError(self)
try:
ts_start = datetime.datetime.now()
# we want to enable the ssh-agent if-and-only-if we are
# not given a password or an ssh key file.
# in this condition it means we want to query the agent
# for available ssh keys
allow_agent = bool((self._auth_password is None) and
(self._ssh_private_key_file is None))
# open connection using ncclient transport
self._conn = netconf_ssh.connect(
host=self._hostname,
port=self._port,
username=self._auth_user,
password=self._auth_password,
hostkey_verify=False,
key_filename=self._ssh_private_key_file,
allow_agent=allow_agent,
ssh_config=self._sshconf_lkup(),
device_params={'name': 'junos', 'local': False})
except NcErrors.AuthenticationError as err:
# bad authentication credentials
raise EzErrors.ConnectAuthError(self)
except NcErrors.SSHError as err:
# this is a bit of a hack for now, since we want to
# know if the connection was refused or we simply could
# not open a connection due to reachability. so using
# a timestamp to differentiate the two conditions for now
# if the diff is < 3 sec, then assume the host is
# reachable, but NETCONF connection is refushed.
ts_err = datetime.datetime.now()
diff_ts = ts_err - ts_start
if diff_ts.seconds < 3:
raise EzErrors.ConnectRefusedError(self)
# at this point, we assume that the connection
# has timeed out due to ip-reachability issues
if str(err).find('not open') > 0:
raise EzErrors.ConnectTimeoutError(self)
else:
# otherwise raise a generic connection
# error for now. tag the new exception
# with the original for debug
cnx = EzErrors.ConnectError(self)
cnx._orig = err
raise cnx
except socket.gaierror:
# invalid DNS name, so unreachable
raise EzErrors.ConnectUnknownHostError(self)
except Exception as err:
# anything else, we will re-raise as a
# generic ConnectError
cnx_err = EzErrors.ConnectError(self)
cnx_err._orig = err
raise cnx_err
self.connected = True
self._nc_transform = self.transform
self._norm_transform = lambda: JXML.normalize_xslt.encode('UTF-8')
normalize = kvargs.get('normalize', self._normalize)
if normalize is True:
self.transform = self._norm_transform
gather_facts = kvargs.get('gather_facts', self._gather_facts)
if gather_facts is True:
self.facts_refresh()
return self
def close(self):
"""
Closes the connection to the device.
"""
self._conn.close_session()
self.connected = False
def _rpc_reply(self, rpc_cmd_e):
return self._conn.rpc(rpc_cmd_e)._NCElement__doc
# -----------------------------------------------------------------------
# Context Manager
# -----------------------------------------------------------------------
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._conn.connected and \
not isinstance(exc_val, EzErrors.ConnectError):
self.close()
|