This file is indexed.

/usr/lib/python2.7/dist-packages/jnpr/junos/transport/tty_telnet.py is in python-junos-eznc 2.0.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from time import sleep
import telnetlib
import logging
import sys
import six

from jnpr.junos.transport.tty import Terminal

logger = logging.getLogger("jnpr.junos.tty_telnet")

# -------------------------------------------------------------------------
# Terminal connection over TELNET CONSOLE
# -------------------------------------------------------------------------


class PY6:
    NEW_LINE = six.b('\n')
    EMPTY_STR = six.b('')
    NETCONF_EOM = six.b(']]>]]>')
    IN_USE = six.b('in use')


class Telnet(Terminal):
    RETRY_OPEN = 3                # number of attempts to open TTY
    RETRY_BACKOFF = 2             # seconds to wait between retries

    def __init__(self, host, port, **kvargs):
        """
        :host:
          The hostname or ip-addr of the ternminal server

        :port:
          The TCP port that maps to the TTY device on the
          console server

        :kvargs['timeout']:
          this is the tty read polling timeout.
          generally you should not have to tweak this.
        """
        # initialize the underlying TTY device

        self._tn = telnetlib.Telnet()
        self._rx = self._tn
        self.host = host
        self.port = port
        self.timeout = kvargs.get('timeout', self.TIMEOUT)
        self.baud = kvargs.get('baud', 9600)
        self._tty_name = "{0}:{1}".format(host, port)

        Terminal.__init__(self, **kvargs)

    # -------------------------------------------------------------------------
    # I/O open close called from Terminal class
    # -------------------------------------------------------------------------

    def _tty_open(self):
        retry = self.RETRY_OPEN
        while retry > 0:
            try:
                self._tn.open(self.host, self.port, self.timeout)
                break
            except Exception as err:
                retry -= 1
                logger.info(
                    "TTY busy: checking back in {0} ...".format(
                        self.RETRY_BACKOFF))
                sleep(self.RETRY_BACKOFF)
        else:
            raise RuntimeError("open_fail: port not ready")
        self.write('\n')

    def _tty_close(self):
        self._tn.close()

    # -------------------------------------------------------------------------
    # I/O read and write called from Terminal class
    # -------------------------------------------------------------------------

    def write(self, content):
        """ write content + <ENTER> """
        logger.debug('Write: %s' % content)
        self._tn.write(six.b((content + '\n')))

    def rawwrite(self, content):
        """ write content as-is """
        logger.debug('rawwrite: %s' % content)
        # If baud set to 0 write full speed
        if int(self.baud) == 0:
            self._tn.write(content)
            return None

        # Write data according to defined baud
        # per 1 byte of data there are 2 additional bits on the line
        # (parity and stop bits)
        if sys.version >= '3':
            content = content.decode('utf-8')
        for char in content:
            self._tn.write(six.b(char))
            wtime = 10/float(self.baud)
            sleep(wtime)                          # do not remove

    def read(self):
        """ read a single line """
        return self._tn.read_until(PY6.NEW_LINE, self.EXPECT_TIMEOUT)

    def read_prompt(self):
        _RE_PAT = [six.b(i) for i in Terminal._RE_PAT]
        got = self._tn.expect(_RE_PAT, self.EXPECT_TIMEOUT)
        if PY6.IN_USE in got[2]:
            raise RuntimeError("open_fail: port already in use")
        if len(got) >= 3:
            logger.debug('Got: %s' % got[2])
        return (None, None) if not got[1] else (got[2], got[1].lastgroup)