This file is indexed.

/usr/lib/python2.7/dist-packages/can/interface.py is in python-can 2.0.0-1.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
from __future__ import absolute_import

import can
import importlib

from can.broadcastmanager import CyclicSendTaskABC, MultiRateCyclicSendTaskABC
from can.util import load_config

# interface_name => (module, classname)
BACKENDS = {
    'kvaser':           ('can.interfaces.kvaser', 'KvaserBus'),
    'socketcan_ctypes': ('can.interfaces.socketcan', 'SocketcanCtypes_Bus'),
    'socketcan_native': ('can.interfaces.socketcan', 'SocketcanNative_Bus'),
    'serial':           ('can.interfaces.serial.serial_can', 'SerialBus'),
    'pcan':             ('can.interfaces.pcan', 'PcanBus'),
    'usb2can':          ('can.interfaces.usb2can', 'Usb2canBus'),
    'ixxat':            ('can.interfaces.ixxat', 'IXXATBus'),
    'nican':            ('can.interfaces.nican', 'NicanBus'),
    'iscan':            ('can.interfaces.iscan', 'IscanBus'),
    'virtual':          ('can.interfaces.virtual', 'VirtualBus'),
    'neovi':            ('can.interfaces.neovi_api', 'NeoVIBus'),
    'vector':           ('can.interfaces.vector', 'VectorBus'),
    'slcan':            ('can.interfaces.slcan', 'slcanBus')
}


class Bus(object):
    """
    Instantiates a CAN Bus of the given `bustype`, falls back to reading a
    configuration file from default locations.
    """

    @classmethod
    def __new__(cls, other, channel=None, *args, **kwargs):
        """
        Takes the same arguments as :class:`can.BusABC` with the addition of:

        :param kwargs:
            Should contain a bustype key with a valid interface name.

        :raises:
            NotImplementedError if the bustype isn't recognized
        :raises:
            ValueError if the bustype or channel isn't either passed as an argument
            or set in the can.rc config.

        """
        config = load_config(config={
            'interface': kwargs.get('bustype'),
            'channel': channel
        })

        if 'bustype' in kwargs:
            # remove the bustype so it doesn't get passed to the backend
            del kwargs['bustype']
        interface = config['interface']
        channel = config['channel']

        # Import the correct Bus backend
        try:
            (module_name, class_name) = BACKENDS[interface]
        except KeyError:
            raise NotImplementedError("CAN interface '{}' not supported".format(interface))

        try:
            module = importlib.import_module(module_name)
        except Exception as e:
            raise ImportError(
                "Cannot import module {} for CAN interface '{}': {}".format(module_name, interface, e)
            )
        try:
            cls = getattr(module, class_name)
        except Exception as e:
            raise ImportError(
                "Cannot import class {} from module {} for CAN interface '{}': {}".format(
                    class_name, module_name, interface, e
                )
            )

        return cls(channel, **kwargs)


class CyclicSendTask(CyclicSendTaskABC):

    @classmethod
    def __new__(cls, other, channel, *args, **kwargs):

        config = load_config(config={'channel': channel})

        # Import the correct implementation of CyclicSendTask
        if config['interface'] == 'socketcan_ctypes':
            from can.interfaces.socketcan.socketcan_ctypes import CyclicSendTask as _ctypesCyclicSendTask
            cls = _ctypesCyclicSendTask
        elif config['interface'] == 'socketcan_native':
            from can.interfaces.socketcan.socketcan_native import CyclicSendTask as _nativeCyclicSendTask
            cls = _nativeCyclicSendTask
        else:
            raise can.CanError("Current CAN interface doesn't support CyclicSendTask")

        return cls(config['channel'], *args, **kwargs)


class MultiRateCyclicSendTask(MultiRateCyclicSendTaskABC):

    @classmethod
    def __new__(cls, other, channel, *args, **kwargs):

        config = load_config(config={'channel': channel})

        # Import the correct implementation of CyclicSendTask
        if config['interface'] == 'socketcan_ctypes':
            from can.interfaces.socketcan.socketcan_ctypes import MultiRateCyclicSendTask as _ctypesMultiRateCyclicSendTask
            cls = _ctypesMultiRateCyclicSendTask
        elif config['interface'] == 'socketcan_native':
            from can.interfaces.socketcan.socketcan_native import MultiRateCyclicSendTask as _nativeMultiRateCyclicSendTask
            cls = _nativeMultiRateCyclicSendTask
        else:
            can.log.info("Current CAN interface doesn't support CyclicSendTask")

        return cls(config['channel'], *args, **kwargs)