/usr/lib/pypy/dist-packages/zmq/error.py is in pypy-zmq 16.0.2-2build2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | """0MQ Error classes and functions."""
# Copyright (C) PyZMQ Developers
# Distributed under the terms of the Modified BSD License.
from errno import EINTR
class ZMQBaseError(Exception):
"""Base exception class for 0MQ errors in Python."""
pass
class ZMQError(ZMQBaseError):
"""Wrap an errno style error.
Parameters
----------
errno : int
The ZMQ errno or None. If None, then ``zmq_errno()`` is called and
used.
msg : string
Description of the error or None.
"""
errno = None
def __init__(self, errno=None, msg=None):
"""Wrap an errno style error.
Parameters
----------
errno : int
The ZMQ errno or None. If None, then ``zmq_errno()`` is called and
used.
msg : string
Description of the error or None.
"""
from zmq.backend import strerror, zmq_errno
if errno is None:
errno = zmq_errno()
if isinstance(errno, int):
self.errno = errno
if msg is None:
self.strerror = strerror(errno)
else:
self.strerror = msg
else:
if msg is None:
self.strerror = str(errno)
else:
self.strerror = msg
# flush signals, because there could be a SIGINT
# waiting to pounce, resulting in uncaught exceptions.
# Doing this here means getting SIGINT during a blocking
# libzmq call will raise a *catchable* KeyboardInterrupt
# PyErr_CheckSignals()
def __str__(self):
return self.strerror
def __repr__(self):
return "%s('%s')" % (self.__class__.__name__, str(self))
class ZMQBindError(ZMQBaseError):
"""An error for ``Socket.bind_to_random_port()``.
See Also
--------
.Socket.bind_to_random_port
"""
pass
class NotDone(ZMQBaseError):
"""Raised when timeout is reached while waiting for 0MQ to finish with a Message
See Also
--------
.MessageTracker.wait : object for tracking when ZeroMQ is done
"""
pass
class ContextTerminated(ZMQError):
"""Wrapper for zmq.ETERM
.. versionadded:: 13.0
"""
def __init__(self, errno='ignored', msg='ignored'):
from zmq import ETERM
super(ContextTerminated, self).__init__(ETERM)
class Again(ZMQError):
"""Wrapper for zmq.EAGAIN
.. versionadded:: 13.0
"""
def __init__(self, errno='ignored', msg='ignored'):
from zmq import EAGAIN
super(Again, self).__init__(EAGAIN)
try:
InterruptedError
except NameError:
InterruptedError = OSError
class InterruptedSystemCall(ZMQError, InterruptedError):
"""Wrapper for EINTR
This exception should be caught internally in pyzmq
to retry system calls, and not propagate to the user.
.. versionadded:: 14.7
"""
def __init__(self, errno='ignored', msg='ignored'):
super(InterruptedSystemCall, self).__init__(EINTR)
def __str__(self):
s = super(InterruptedSystemCall, self).__str__()
return s + ": This call should have been retried. Please report this to pyzmq."
def _check_rc(rc, errno=None):
"""internal utility for checking zmq return condition
and raising the appropriate Exception class
"""
if rc == -1:
if errno is None:
from zmq.backend import zmq_errno
errno = zmq_errno()
from zmq import EAGAIN, ETERM
if errno == EINTR:
raise InterruptedSystemCall(errno)
elif errno == EAGAIN:
raise Again(errno)
elif errno == ETERM:
raise ContextTerminated(errno)
else:
raise ZMQError(errno)
_zmq_version_info = None
_zmq_version = None
class ZMQVersionError(NotImplementedError):
"""Raised when a feature is not provided by the linked version of libzmq.
.. versionadded:: 14.2
"""
min_version = None
def __init__(self, min_version, msg='Feature'):
global _zmq_version
if _zmq_version is None:
from zmq import zmq_version
_zmq_version = zmq_version()
self.msg = msg
self.min_version = min_version
self.version = _zmq_version
def __repr__(self):
return "ZMQVersionError('%s')" % str(self)
def __str__(self):
return "%s requires libzmq >= %s, have %s" % (self.msg, self.min_version, self.version)
def _check_version(min_version_info, msg='Feature'):
"""Check for libzmq
raises ZMQVersionError if current zmq version is not at least min_version
min_version_info is a tuple of integers, and will be compared against zmq.zmq_version_info().
"""
global _zmq_version_info
if _zmq_version_info is None:
from zmq import zmq_version_info
_zmq_version_info = zmq_version_info()
if _zmq_version_info < min_version_info:
min_version = '.'.join(str(v) for v in min_version_info)
raise ZMQVersionError(min_version, msg)
__all__ = [
'ZMQBaseError',
'ZMQBindError',
'ZMQError',
'NotDone',
'ContextTerminated',
'InterruptedSystemCall',
'Again',
'ZMQVersionError',
]
|