/usr/lib/python3/dist-packages/can/bus.py is in python3-can 2.0.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 | # -*- coding: utf-8 -*-
from __future__ import print_function, absolute_import
import abc
import logging
import threading
from can.broadcastmanager import ThreadBasedCyclicSendTask
logger = logging.getLogger(__name__)
class BusABC(object):
"""CAN Bus Abstract Base Class
Concrete implementations must implement the following methods:
* send
* recv
As well as setting the `channel_info` attribute to a string describing the
interface.
"""
#: a string describing the underlying bus channel
channel_info = 'unknown'
@abc.abstractmethod
def __init__(self, channel=None, can_filters=None, **config):
"""
:param channel:
The can interface identifier. Expected type is backend dependent.
:param list can_filters:
A list of dictionaries each containing a "can_id", a "can_mask",
and an "extended" key.
>>> [{"can_id": 0x11, "can_mask": 0x21, "extended": False}]
A filter matches, when ``<received_can_id> & can_mask == can_id & can_mask``
:param dict config:
Any backend dependent configurations are passed in this dictionary
"""
@abc.abstractmethod
def recv(self, timeout=None):
"""Block waiting for a message from the Bus.
:param float timeout: Seconds to wait for a message.
:return:
None on timeout or a :class:`can.Message` object.
"""
raise NotImplementedError("Trying to read from a write only bus?")
@abc.abstractmethod
def send(self, msg, timeout=None):
"""Transmit a message to CAN bus.
Override this method to enable the transmit path.
:param can.Message msg: A message object.
:param float timeout:
If > 0, wait up to this many seconds for message to be ACK:ed or
for transmit queue to be ready depending on driver implementation.
If timeout is exceeded, an exception will be raised.
Might not be supported by all interfaces.
:raise: :class:`can.CanError`
if the message could not be written.
"""
raise NotImplementedError("Trying to write to a readonly bus?")
def send_periodic(self, msg, period, duration=None):
"""Start sending a message at a given period on this bus.
:param can.Message msg:
Message to transmit
:param float period:
Period in seconds between each message
:param float duration:
The duration to keep sending this message at given rate. If
no duration is provided, the task will continue indefinitely.
:return: A started task instance
:rtype: can.CyclicSendTaskABC
Note the duration before the message stops being sent may not
be exactly the same as the duration specified by the user. In
general the message will be sent at the given rate until at
least *duration* seconds.
"""
if not hasattr(self, "_lock"):
# Create send lock for this bus
self._lock = threading.Lock()
return ThreadBasedCyclicSendTask(self, self._lock, msg, period, duration)
def __iter__(self):
"""Allow iteration on messages as they are received.
>>> for msg in bus:
... print(msg)
:yields: :class:`can.Message` msg objects.
"""
while True:
m = self.recv(timeout=1.0)
if m is not None:
yield m
logger.debug("done iterating over bus messages")
def set_filters(self, can_filters=None):
"""Apply filtering to all messages received by this Bus.
Calling without passing any filters will reset the applied filters.
:param list can_filters:
A list of dictionaries each containing a "can_id" and a "can_mask".
>>> [{"can_id": 0x11, "can_mask": 0x21}]
A filter matches, when ``<received_can_id> & can_mask == can_id & can_mask``
"""
raise NotImplementedError("Trying to set_filters on unsupported bus")
def flush_tx_buffer(self):
"""Discard every message that may be queued in the output buffer(s).
"""
pass
def shutdown(self):
"""
Called to carry out any interface specific cleanup required
in shutting down a bus.
"""
self.flush_tx_buffer()
__metaclass__ = abc.ABCMeta
|