/usr/share/pyshared/gnatpython/expect.py is in python-gnatpython 54-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | ############################################################################
# #
# EXPECT.PY #
# #
# Copyright (C) 2010 Ada Core Technologies, Inc. #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <http://www.gnu.org/licenses/> #
# #
############################################################################
import _term
import os
import re
import datetime
from time import sleep
from gnatpython.fileutils import which
EXPECT_TIMEOUT = -2
EXPECT_DIED = -3
class ExpectError (Exception):
"""Expect exception"""
def __init__(self, cmd, msg):
Exception.__init__(self, cmd, msg)
self.cmd = cmd
self.msg = msg
def __str__(self):
return "%s: %s" % (self.cmd, self.msg)
class ExpectProcess(object):
"""Expect Main class
ATTRIBUTES
command_line: list of strings containg the command line used to spawn the
process.
status: The return code. None while the command is still
running, and an integer after method "close" has
been called.
"""
def __init__(self, command_line):
"""Constructor
PARAMETERS
command_line: A list of strings representing the command line to be
spawned.
RETURN VALUE
A ExpectProcess object
"""
# Convert the command line to a list of string is needed
command_line = [str(arg) for arg in command_line]
if len(command_line) < 1:
raise ExpectError('__init__',
'expect a non empty list as argument')
command_line[0] = which(command_line[0])
# Store the command line used
self.command_line = command_line
# Spawn the process
(self.input, self.output, self.error, self.pid, self.handler) = \
_term.non_blocking_spawn(tuple(command_line))
# Initialize our buffer
self.buffer = ""
# Keep the state of the process
self.process_is_dead = False
# This is where we store that last successful expect result
self.last_match = None
# This is where the command returned status will be stored
# when the command has exited. For the moment, it is not
# available.
self.status = None
def __poll(self, timeout):
result = _term.poll((self.output, ), timeout)
if result[0] > 0:
read_status = _term.read(self.output, 16384)
if read_status[0] > 0:
self.buffer += read_status[1]
elif read_status[0] < 0:
self.process_is_dead = True
def flush(self):
"""Flush all the output generated by the process up to the call
to this method."""
self.__poll(0)
self.buffer = ""
def sendline(self, msg):
return self.send(msg + '\n')
def send(self, msg, add_lf=True, flush_buffer=False):
"""Send a msg to the program
PARAMETERS
msg: a string
RETURN VALUE
Return 1 if OK, 0 otherwise.
"""
if self.handler is None:
raise ExpectError('send', 'process has been closed')
if add_lf:
msg += '\n'
if flush_buffer:
self.flush()
write_status = _term.write(self.input, msg)
if write_status < 0:
return 0
else:
return 1
def expect(self, patterns, timeout):
if self.handler is None:
raise ExpectError('expect', 'process has been closed')
match = None
result = 0
expect_start = datetime.datetime.utcnow()
time_left = int(timeout * 1000.0)
while match is None and time_left > 0:
# Do we have a match with the current output
for index, pattern in enumerate(patterns):
match = re.search(pattern, self.buffer)
if match is not None:
result = index
break
if match is not None:
break
else:
# We don't have a match so poll for new output
self.__poll(time_left)
if self.process_is_dead:
return EXPECT_DIED
# update time_left.
# The update is done only if current time is superior to time
# at which the function started. This test might seem a bit
# weird but on some Linux machines on VmWare we have found
# huge clock drift that the system tries to compensate. The
# consequence is that we cannot assume that the clock is
# monotonic.
current_time = datetime.datetime.utcnow()
if current_time > expect_start:
time_spent = (current_time - expect_start)
time_left = int(timeout * 1000.0) - \
(time_spent.seconds * 1000 + \
time_spent.microseconds / 1000)
if match is not None:
self.last_match = (result, self.buffer[:match.start(0)], match)
self.buffer = self.buffer[match.end(0):]
return result
if time_left < 0:
return EXPECT_TIMEOUT
def out(self):
if self.last_match is None:
return ("", "")
else:
return (self.last_match[1], self.last_match[2].group(0))
def close(self):
"""If the underlying process is not dead yet, kill it.
Set the status attribute to the command return code."""
if self.handler is not None:
self.interrupt()
sleep(0.05)
_term.terminate(self.handler)
self.status = _term.waitpid(self.handler)
self.handler = None
def interrupt(self):
if not self.process_is_dead and self.handler is not None:
_term.interrupt(self.handler)
def set_timer(self, delay):
self.timer_end = datetime.datetime.utcnow() + \
datetime.timedelta(seconds=delay)
def has_timer_expired(self):
if self.timer_end < datetime.datetime.utcnow():
return True
else:
return False
|