/usr/lib/python2.7/dist-packages/jnpr/junos/transport/tty_netconf.py is in python-junos-eznc 2.0.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 | import re
import time
from lxml import etree
import select
import socket
import logging
import sys
from lxml.builder import E
from datetime import datetime, timedelta
from ncclient.operations.rpc import RPCReply, RPCError
from ncclient.xml_ import to_ele
import six
class PY6:
NEW_LINE = six.b('\n')
EMPTY_STR = six.b('')
NETCONF_EOM = six.b(']]>]]>')
STARTS_WITH = six.b("<!--")
__all__ = ['xmlmode_netconf']
_NETCONF_EOM = six.b(']]>]]>')
_xmlns = re.compile(six.b('xmlns=[^>]+'))
_xmlns_strip = lambda text: _xmlns.sub(PY6.EMPTY_STR, text)
_junosns = re.compile(six.b('junos:'))
_junosns_strip = lambda text: _junosns.sub(PY6.EMPTY_STR, text)
logger = logging.getLogger("jnpr.junos.tty_netconf")
# =========================================================================
# xmlmode_netconf
# =========================================================================
class tty_netconf(object):
"""
Basic Junos XML API for bootstraping through the TTY
"""
def __init__(self, tty):
self._tty = tty
self.hello = None
# -------------------------------------------------------------------------
# NETCONF session open and close
# -------------------------------------------------------------------------
def open(self, at_shell):
""" start the XML API process and receive the 'hello' message """
nc_cmd = ('junoscript', 'xml-mode')[at_shell]
self._tty.write(nc_cmd + ' netconf need-trailer')
mark_start = datetime.now()
mark_end = mark_start + timedelta(seconds=15)
while datetime.now() < mark_end:
time.sleep(0.1)
line = self._tty.read()
if line.startswith(PY6.STARTS_WITH):
break
else:
# exceeded the while loop timeout
raise RuntimeError("Netconify Error: netconf not responding")
self.hello = self._receive()
def close(self, force=False):
""" issue the XML API to close the session """
# if we do not have an open connection, then return now.
if force is False:
if self.hello is None:
return
self.rpc('close-session')
# removed flush
# -------------------------------------------------------------------------
# MISC device commands
# -------------------------------------------------------------------------
def zeroize(self):
""" issue a reboot to the device """
cmd = E.command('request system zeroize')
try:
encode = None if sys.version < '3' else 'unicode'
rsp = self.rpc(etree.tostring(cmd, encoding=encode))
except:
return False
return True
# -------------------------------------------------------------------------
# XML RPC command execution
# -------------------------------------------------------------------------
def rpc(self, cmd):
"""
Write the XML cmd and return the response as XML object.
:cmd:
<str> of the XML command. if the :cmd: is not XML, then
this routine will perform the brackets; i.e. if given
'get-software-information', this routine will turn
it into '<get-software-information/>'
NOTES:
The return XML object is the first child element after
the <rpc-reply>. There is also no error-checking
performing by this routine.
"""
if not cmd.startswith('<'):
cmd = '<{0}/>'.format(cmd)
rpc = six.b('<rpc>{0}</rpc>'.format(cmd))
logger.info('Calling rpc: %s' % rpc)
self._tty.rawwrite(rpc)
rsp = self._receive()
rsp = rsp.decode('utf-8') if isinstance(rsp, bytes) else rsp
reply = RPCReply(rsp)
errors = reply.errors
if len(errors) > 1:
raise RPCError(to_ele(reply._raw), errs=errors)
elif len(errors) == 1:
raise reply.error
return rsp
# -------------------------------------------------------------------------
# LOW-LEVEL I/O for reading back XML response
# -------------------------------------------------------------------------
def _receive(self):
""" process the XML response into an XML object """
rxbuf = PY6.EMPTY_STR
line = PY6.EMPTY_STR
while True:
try:
rd, wt, err = select.select([self._tty._rx], [], [], 0.1)
except select.error as err:
raise err
except socket.error as err:
raise err
if rd:
line, lastline = rd[0].read_until(PY6.NETCONF_EOM, 0.1), line
if not line:
continue
if _NETCONF_EOM in line or _NETCONF_EOM in lastline + line:
rxbuf = rxbuf + line
break
else:
rxbuf = rxbuf + line
if _NETCONF_EOM in rxbuf:
break
rxbuf = rxbuf.splitlines()
if _NETCONF_EOM in rxbuf[-1]:
rxbuf.pop()
try:
rxbuf = [i.strip() for i in rxbuf if i.strip() != PY6.EMPTY_STR]
rcvd_data = PY6.NEW_LINE.join(rxbuf)
logger.debug('Received: \n%s' % rcvd_data)
try:
etree.XML(rcvd_data)
except Exception as ex:
if isinstance(ex, etree.XMLSyntaxError):
rcvd_data = rcvd_data[:rcvd_data.index(']]>]]>')]
etree.XML(rcvd_data)
return rcvd_data
except:
if '</xnm:error>' in rxbuf:
for x in rxbuf:
if '<message>' in x:
return etree.XML(
'<error-in-receive>' + x + '</error-in-receive>')
else:
return etree.XML('<error-in-receive/>')
|