This file is indexed.

/usr/lib/python3/dist-packages/pyvisa-py/sessions.py is in python3-pyvisa-py 0.2-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# -*- coding: utf-8 -*-
"""
    pyvisa-py.session
    ~~~~~~~~~~~~~~~~~

    Base Session class.


    :copyright: 2014 by PyVISA-py Authors, see AUTHORS for more details.
    :license: MIT, see LICENSE for more details.
"""

from __future__ import division, unicode_literals, print_function, absolute_import

import abc
import time

from pyvisa import logger, constants, attributes, compat, rname

from . import common


class UnknownAttribute(Exception):

    def __init__(self, attribute):
        self.attribute = attribute

    def __str__(self):
        attr = self.attribute
        if isinstance(attr, int):
            try:
                name = attributes.AttributesByID[attr].visa_name
            except KeyError:
                name = 'Name not found'

            return 'Unknown attribute %s (%s - %s)' % (attr, hex(attr), name)

        return 'Unknown attribute %s' % attr

    __repr__ = __str__


class Session(compat.with_metaclass(abc.ABCMeta)):
    """A base class for Session objects.

    Just makes sure that common methods are defined and information is stored.

    :param resource_manager_session: The session handle of the parent Resource Manager
    :param resource_name: The resource name.
    :param parsed: the parsed resource name (optional).
                   If not provided, the resource_name will be parsed.
    """

    @abc.abstractmethod
    def _get_attribute(self, attribute):
        """Get the value for a given VISA attribute for this session.

        Use to implement custom logic for attributes.

        :param attribute: Resource attribute for which the state query is made
        :return: The state of the queried attribute for a specified resource, return value of the library call.
        :rtype: (unicode | str | list | int, VISAStatus)
        """

    @abc.abstractmethod
    def _set_attribute(self, attribute, attribute_state):
        """Set the attribute_state value for a given VISA attribute for this session.

        Use to implement custom logic for attributes.

        :param attribute: Resource attribute for which the state query is made.
        :param attribute_state: value.
        :return: The return value of the library call.
        :rtype: VISAStatus
        """

    @abc.abstractmethod
    def close(self):
        """Close the session. Use it to do final clean ups.
        """

    #: Maps (Interface Type, Resource Class) to Python class encapsulating that resource.
    #: dict[(Interface Type, Resource Class) , Session]
    _session_classes = dict()

    #: Session type as (Interface Type, Resource Class)
    session_type = None

    @classmethod
    def get_low_level_info(cls):
        return ''

    @classmethod
    def iter_valid_session_classes(cls):
        """Yield (Interface Type, Resource Class), Session class pair for
        valid sessions classes.
        """

        for key, val in cls._session_classes.items():
            if issubclass(val, Session):
                yield key, val

    @classmethod
    def iter_session_classes_issues(cls):
        """Yield (Interface Type, Resource Class), Issues class pair for
        invalid sessions classes (i.e. those with import errors).
        """
        for key, val in cls._session_classes.items():
            try:
                yield key, getattr(val, 'session_issue')
            except AttributeError:
                pass

    @classmethod
    def get_session_class(cls, interface_type, resource_class):
        """Return the session class for a given interface type and resource class.

        :type interface_type: constants.InterfaceType
        :type resource_class: str
        :return: Session
        """
        try:
            return cls._session_classes[(interface_type, resource_class)]
        except KeyError:
            raise ValueError('No class registered for %s, %s' % (interface_type, resource_class))

    @classmethod
    def register(cls, interface_type, resource_class):
        """Register a session class for a given interface type and resource class.

        :type interface_type: constants.InterfaceType
        :type resource_class: str
        """
        def _internal(python_class):
            if (interface_type, resource_class) in cls._session_classes:
                logger.warning('%s is already registered in the ResourceManager. '
                               'Overwriting with %s' % ((interface_type, resource_class), python_class))

            python_class.session_type = (interface_type, resource_class)
            cls._session_classes[(interface_type, resource_class)] = python_class
            return python_class
        return _internal

    @classmethod
    def register_unavailable(cls, interface_type, resource_class, msg):
        """Register an unavailable session class for a given interface type and resource class.
        raising a ValueError if called.

        :type interface_type: constants.InterfaceType
        :type resource_class: str
        """
        # noinspection PyUnusedLocal
        def _internal(*args, **kwargs):
            raise ValueError(msg)

        _internal.session_issue = msg

        if (interface_type, resource_class) in cls._session_classes:
            logger.warning('%s is already registered in the ResourceManager. '
                           'Overwriting with unavailable %s' % ((interface_type, resource_class), msg))

        cls._session_classes[(interface_type, resource_class)] = _internal

    def __init__(self, resource_manager_session, resource_name, parsed=None):
        if isinstance(resource_name, common.MockInterface):
            parsed = rname.parse_resource_name(resource_name.resource_name)
            parsed['mock'] = resource_name

        elif parsed is None:
            parsed = rname.parse_resource_name(resource_name)

        self.parsed = parsed

        #: Used as a place holder for the object doing the lowlevel communication.
        self.interface = None

        #: Used for attributes not handled by the underlying interface.
        #: Values are get or set automatically by get_attribute and set_attribute
        #: Add your own by overriding after_parsing.
        self.attrs = {constants.VI_ATTR_RM_SESSION: resource_manager_session,
                      constants.VI_ATTR_RSRC_NAME: str(parsed),
                      constants.VI_ATTR_RSRC_CLASS: parsed.resource_class,
                      constants.VI_ATTR_INTF_TYPE: parsed.interface_type}
        self.after_parsing()

    def after_parsing(self):
        """Override this method to provide custom initialization code, to be
        called after the resourcename is properly parsed
        """

    def get_attribute(self, attribute):
        """Get the value for a given VISA attribute for this session.

        Does a few checks before and calls before dispatching to `_get_attribute`.

        :param attribute: Resource attribute for which the state query is made
        :return: The state of the queried attribute for a specified resource, return value of the library call.
        :rtype: (unicode | str | list | int, VISAStatus)
        """

        # Check if the attribute value is defined.
        try:
            attr = attributes.AttributesByID[attribute]
        except KeyError:
            return 0, constants.StatusCode.error_nonsupported_attribute

        # Check if the attribute is defined for this session type.
        if not attr.in_resource(self.session_type):
            return 0, constants.StatusCode.error_nonsupported_attribute

        # Check if reading the attribute is allowed.
        if not attr.read:
            raise Exception('Do not now how to handle write only attributes.')

        # First try to answer those attributes that are common to all session types
        # or user defined because they are not defined by the interface.
        if attribute in self.attrs:
            return self.attrs[attribute], constants.StatusCode.success

        elif attribute == constants.VI_ATTR_TMO_VALUE:
            return self.timeout, constants.StatusCode.success

        # Dispatch to `_get_attribute`, which must be implemented by subclasses.

        try:
            return self._get_attribute(attribute)
        except UnknownAttribute as e:
            logger.exception(str(e))
            return 0, constants.StatusCode.error_nonsupported_attribute

    def set_attribute(self, attribute, attribute_state):
        """Set the attribute_state value for a given VISA attribute for this session.

        Does a few checks before and calls before dispatching to `_gst_attribute`.

        :param attribute: Resource attribute for which the state query is made.
        :param attribute_state: value.
        :return: The return value of the library call.
        :rtype: VISAStatus
        """

        # Check if the attribute value is defined.
        try:
            attr = attributes.AttributesByID[attribute]
        except KeyError:
            return constants.StatusCode.error_nonsupported_attribute

        # Check if the attribute is defined for this session type.
        if not attr.in_resource(self.session_type):
            return constants.StatusCode.error_nonsupported_attribute

        # Check if writing the attribute is allowed.
        if not attr.write:
            return constants.StatusCode.error_attribute_read_only

        # First try to answer those attributes that are common to all session types
        # or user defined because they are not defined by the interface.
        if attribute in self.attrs:
            self.attrs[attribute] = attribute_state
            return constants.StatusCode.success

        elif attribute == constants.VI_ATTR_TMO_VALUE:
            try:
                self.timeout = attribute_state
            except:
                return constants.StatusCode.error_nonsupported_attribute_state

            return constants.StatusCode.success

        # Dispatch to `_set_attribute`, which must be implemented by subclasses.

        try:
            return self._set_attribute(attribute, attribute_state)
        except ValueError:
            return constants.StatusCode.error_nonsupported_attribute_state
        except NotImplementedError:
            e = UnknownAttribute(attribute)
            logger.exception(str(e))
            return constants.StatusCode.error_nonsupported_attribute
        except UnknownAttribute as e:
            logger.exception(str(e))
            return constants.StatusCode.error_nonsupported_attribute

    def _read(self, reader, count, end_indicator_checker, suppress_end_en,
              termination_char, termination_char_en, timeout_exception):
        """Reads data from device or interface synchronously.

        Corresponds to viRead function of the VISA library.

        :param reader: Function to read a single byte.
        :type reader: () -> byte
        :param count: Number of bytes to be read.
        :type count: int
        :param end_indicator_checker: Function to check if the byte.
        :type end_indicator_checker: (byte) -> boolean
        :param suppress_end_en: suppress end.
        :type suppress_end_en: bool
        :param termination_char: Number of bytes to be read.
        :param termination_char_en: termination char enabled.
        :type termination_char_en: boolean
        :param: timeout_exception: Exception to capture time out for the given interface.
        :type: Exception
        :return: data read, return value of the library call.
        :rtype: bytes, constants.StatusCode
        """

        timeout = self.get_attribute(constants.VI_ATTR_TMO_VALUE)[0] / 1000.

        start = time.time()
        out = b''
        while True:
            try:
                current = reader()
            except timeout_exception:
                return out, constants.StatusCode.error_timeout

            if current:
                out += current
                end_indicator_received = end_indicator_checker(current)
                if end_indicator_received and not suppress_end_en:
                    # RULE 6.1.1
                    return out, constants.StatusCode.success

                elif not end_indicator_received and current == termination_char and termination_char_en:
                    # RULE 6.1.2
                    return out, constants.StatusCode.success_termination_character_read

                elif not end_indicator_received and current != termination_char and len(out) == count:
                    # RULE 6.1.3
                    return out, constants.StatusCode.success_max_count_read

            if time.time() - start > timeout:
                return out, constants.StatusCode.error_timeout