This file is indexed.

/usr/lib/python2.7/dist-packages/jnpr/junos/transport/tty.py is in python-junos-eznc 2.0.1-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
from time import sleep
import logging

from jnpr.junos import exception as EzErrors
from jnpr.junos.transport.tty_netconf import tty_netconf

logger = logging.getLogger("jnpr.junos.tty")

__all__ = ['Terminal']

# =========================================================================
# Terminal class
# =========================================================================


class Terminal(object):

    """
    Terminal is used to bootstrap Junos New Out of the Box (NOOB) device
    over the CONSOLE port.  The general use-case is to setup the minimal
    configuration so that the device is IP reachable using SSH
    and NETCONF for remote management.

    Serial is needed for Junos devices that do not support
    the DHCP 'auto-installation' or 'ZTP' feature; i.e. you *MUST*
    to the NOOB configuration via the CONSOLE.

    Serial is also useful for situations even when the Junos
    device supports auto-DHCP, but is not an option due to the
    specific situation
    """
    TIMEOUT = 0.2           # serial readline timeout, seconds
    EXPECT_TIMEOUT = 10     # total read timeout, seconds
    LOGIN_RETRY = 20         # total number of passes thru login state-machine

    _ST_INIT = 0
    _ST_LOADER = 1
    _ST_LOGIN = 2
    _ST_PASSWD = 3
    _ST_DONE = 4
    _ST_BAD_PASSWD = 5
    _ST_TTY_NOLOGIN = 6
    _ST_TTY_OPTION = 7
    _ST_TTY_HOTKEY = 8

    _re_pat_login = '(?P<login>ogin:\s*$)'

    _RE_PAT = [
        '(?P<loader>oader>\s*$)',
        _re_pat_login,
        '(?P<passwd>assword:\s*$)',
        '(?P<badpasswd>ogin incorrect)',
        '(?P<already_closed>session end at .*\n%)',
        '(?P<shell>%|#\s*$)',
        '(?P<cli>[^\\-"]>\s*$)',
        '(?P<option>Enter your option:\s*$)',
        '(?P<hotkey>connection: <CTRL>Z)',
    ]

    # -----------------------------------------------------------------------
    # CONSTRUCTOR
    # -----------------------------------------------------------------------

    def __init__(self, **kvargs):
        """
        :kvargs['user']:
          defaults to 'root'

        :kvargs['passwd']:
          defaults to empty; NOOB Junos devics there is
          no root password initially

        :kvargs['attempts']:
          the total number of login attempts thru the login
          state-machine
        """
        # logic args
        self.hostname = self.__dict__.get('host')
        self.user = kvargs.get('user', 'root')
        self.passwd = kvargs.get('passwd', '')
        self.c_user = kvargs.get('s_user', self.user)
        self.c_passwd = kvargs.get('s_passwd', self.passwd)
        self.login_attempts = kvargs.get('attempts') or self.LOGIN_RETRY

        # misc setup
        self.nc = tty_netconf(self)
        self.state = self._ST_INIT
        self._badpasswd = 0
        self._loader = 0

    @property
    def tty_name(self):
        return self._tty_name

    # -----------------------------------------------------------------------
    # Login/logout
    # -----------------------------------------------------------------------

    def login(self):
        """
        open the TTY connection and login.  once the login is successful,
        start the NETCONF XML API process
        """
        logger.info('TTY: connecting to TTY:{0} ...'.format(self.tty_name))
        self._tty_open()

        logger.info('TTY: logging in......')

        self.state = self._ST_INIT
        self._login_state_machine()

        # now start NETCONF XML
        logger.info('TTY: OK.....starting NETCONF')
        self.nc.open(at_shell=self.at_shell)
        return True

    def logout(self):
        """
        cleanly logout of the TTY
        """
        logger.info('logout: logging out.....')
        self.nc.close()
        self._logout_state_machine()
        return True

        # ---------------------------------------------------------------------
        # TTY logout state-machine
        # ---------------------------------------------------------------------

    def _logout_state_machine(self, attempt=0):
        if 10 == attempt:
            raise RuntimeError('logout_sm_failure')

        prompt, found = self.read_prompt()

        def _ev_login():
            # back at login prompt, so we are cleanly done!
            self._tty_close()

        def _ev_shell():
            self.write('exit')

        def _ev_cli():
            self.write('exit')

        # Connection closed by foreign host
        def _ev_already_closed():
            return True

        _ev_tbl = {
            'login': _ev_login,
            'shell': _ev_shell,
            'cli': _ev_cli,
            'already_closed': _ev_already_closed
        }

        # hack for now
        # in case of telnet to management port, after writing exit on console
        # it exists completely and returns None
        ###
        if found is not None:
            _ev_tbl[found]()
        else:
            return True

        if found == 'login' or found == 'already_closed':
            return True

        else:
            sleep(1)
            self._logout_state_machine(attempt=attempt + 1)

    # -----------------------------------------------------------------------
    # TTY login state-machine
    # -----------------------------------------------------------------------
    def _login_state_machine(self, attempt=0):
        if self.login_attempts == attempt:
            raise RuntimeError('login_sm_failure')

        prompt, found = self.read_prompt()

        def _ev_loader():
            self.state = self._ST_LOADER
            self.write('boot')
            self.write('\n')
            sleep(300)
            self._login_state_machine(attempt=0)
            self._loader += 1
            if self._loader == 2:
                raise RuntimeError("propably corrupted image, stuck in loader")

        def _ev_login():
            self.state = self._ST_LOGIN
            self.write(self.user)

        def _ev_passwd():
            self.state = self._ST_PASSWD
            self.write(self.passwd)

        def _ev_bad_passwd():
            self.state = self._ST_BAD_PASSWD
            self.write('\n')
            self._badpasswd += 1
            if self._badpasswd == 2:
                # raise RuntimeError("Bad username/password")
                raise EzErrors.ConnectAuthError(self, "Bad username/password")
            # return through and try again ... could have been
            # prior failed attempt

        def _ev_tty_nologin():
            if self._ST_INIT == self.state:
                # assume we're in a hung state, i.e. we don't see
                # a login prompt for whatever reason
                self.state = self._ST_TTY_NOLOGIN
                self.write('<close-session/>')  # @@@ this is a hack
                # if console connection have a banner or warning
                # comment-out line above and uncoment lines bellow ... better hack
                # sleep(5)
                # self.write('\n')

        def _ev_shell():
            if self.state == self._ST_INIT:
                # this means that the shell was left
                # open.  probably not a good thing,
                # so issue a logging message, but move on.
                logger.warning('login_warn: Shell login was open!!')

            self.at_shell = True
            self.state = self._ST_DONE
            # if we are here, then we are done

        def _ev_cli():
            if self.state == self._ST_INIT:
                # this means that the shell was left open.  probably not a good thing,
                # so issue a logging message, hit <ENTER> and try again just to be
                # sure...
                logger.warning('login_warn: waiting on TTY..... ')
                sleep(5)
                #  return

            self.at_shell = False
            self.state = self._ST_DONE

        def _ev_option():
            self.state = self._ST_TTY_OPTION
            self.write("1")

        def _ev_hot_key():
            self.state = self._ST_TTY_HOTKEY
            self.write("\n")

        _ev_tbl = {
            'loader': _ev_loader,
            'login': _ev_login,
            'passwd': _ev_passwd,
            'badpasswd': _ev_bad_passwd,
            'shell': _ev_shell,
            'cli': _ev_cli,
            'option': _ev_option,
            'hotkey': _ev_hot_key
        }

        _ev_tbl.get(found, _ev_tty_nologin)()

        if self.state == self._ST_DONE:
            return True
        else:
            # if we are here, then loop the event again
            self._login_state_machine(attempt + 1)