This file is indexed.

/usr/lib/python3/dist-packages/qtconsole/base_frontend_mixin.py is in python3-qtconsole 4.2.1-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
"""Defines a convenient mix-in class for implementing Qt frontends."""

from jupyter_client import BlockingKernelClient

class BaseFrontendMixin(object):
    """ A mix-in class for implementing Qt frontends.

    To handle messages of a particular type, frontends need only define an
    appropriate handler method. For example, to handle 'stream' messaged, define
    a '_handle_stream(msg)' method.
    """

    #---------------------------------------------------------------------------
    # 'BaseFrontendMixin' concrete interface
    #---------------------------------------------------------------------------
    _kernel_client = None
    _kernel_manager = None
    _blocking_client = None

    @property
    def kernel_client(self):
        """Returns the current kernel client."""
        return self._kernel_client

    @kernel_client.setter
    def kernel_client(self, kernel_client):
        """Disconnect from the current kernel client (if any) and set a new
            kernel client.
        """
        # Disconnect the old kernel client, if necessary.
        old_client = self._kernel_client
        if old_client is not None:
            old_client.started_channels.disconnect(self._started_channels)
            old_client.stopped_channels.disconnect(self._stopped_channels)

            # Disconnect the old kernel client's channels.
            old_client.iopub_channel.message_received.disconnect(self._dispatch)
            old_client.shell_channel.message_received.disconnect(self._dispatch)
            old_client.stdin_channel.message_received.disconnect(self._dispatch)
            old_client.hb_channel.kernel_died.disconnect(
                self._handle_kernel_died)

            # Handle the case where the old kernel client is still listening.
            if old_client.channels_running:
                self._stopped_channels()

        # Set the new kernel client.
        self._kernel_client = kernel_client
        if kernel_client is None:
            return

        # Connect the new kernel client.
        kernel_client.started_channels.connect(self._started_channels)
        kernel_client.stopped_channels.connect(self._stopped_channels)

        # Connect the new kernel client's channels.
        kernel_client.iopub_channel.message_received.connect(self._dispatch)
        kernel_client.shell_channel.message_received.connect(self._dispatch)
        kernel_client.stdin_channel.message_received.connect(self._dispatch)
        # hb_channel
        kernel_client.hb_channel.kernel_died.connect(self._handle_kernel_died)

        # Handle the case where the kernel client started channels before
        # we connected.
        if kernel_client.channels_running:
            self._started_channels()
    
    def _make_blocking_client(self):
        kc = self.kernel_client
        if kc is None:
            return
        
        try:
            blocking_client = kc.blocking_client
        except AttributeError:
            def blocking_client():
                info = kc.get_connection_info()
                bc = BlockingKernelClient(**info)
                bc.session.key = kc.session.key
                return bc

        self._blocking_client = blocking_client()
        self._blocking_client.shell_channel.start()
    
    @property
    def blocking_client(self):
        if self._blocking_client is None:
            self._make_blocking_client()
        return self._blocking_client

    @property
    def kernel_manager(self):
        """The kernel manager, if any"""
        return self._kernel_manager

    @kernel_manager.setter
    def kernel_manager(self, kernel_manager):
        old_man = self._kernel_manager
        if old_man is not None:
            old_man.kernel_restarted.disconnect(self._handle_kernel_restarted)

        self._kernel_manager = kernel_manager
        if kernel_manager is None:
            return

        kernel_manager.kernel_restarted.connect(self._handle_kernel_restarted)

    #---------------------------------------------------------------------------
    # 'BaseFrontendMixin' abstract interface
    #---------------------------------------------------------------------------

    def _handle_kernel_died(self, since_last_heartbeat):
        """ This is called when the ``kernel_died`` signal is emitted.

        This method is called when the kernel heartbeat has not been
        active for a certain amount of time.
        This is a strictly passive notification -
        the kernel is likely being restarted by its KernelManager.

        Parameters
        ----------
        since_last_heartbeat : float
            The time since the heartbeat was last received.
        """

    def _handle_kernel_restarted(self):
        """ This is called when the ``kernel_restarted`` signal is emitted.

        This method is called when the kernel has been restarted by the
        autorestart mechanism.

        Parameters
        ----------
        since_last_heartbeat : float
            The time since the heartbeat was last received.
        """
    def _started_kernel(self):
        """Called when the KernelManager starts (or restarts) the kernel subprocess.
        Channels may or may not be running at this point.
        """

    def _started_channels(self):
        """ Called when the KernelManager channels have started listening or
            when the frontend is assigned an already listening KernelManager.
        """

    def _stopped_channels(self):
        """ Called when the KernelManager channels have stopped listening or
            when a listening KernelManager is removed from the frontend.
        """

    #---------------------------------------------------------------------------
    # 'BaseFrontendMixin' protected interface
    #---------------------------------------------------------------------------

    def _dispatch(self, msg):
        """ Calls the frontend handler associated with the message type of the
            given message.
        """
        msg_type = msg['header']['msg_type']
        handler = getattr(self, '_handle_' + msg_type, None)
        if handler:
            handler(msg)
    
    def from_here(self, msg):
        """Return whether a message is from this session"""
        session_id = self._kernel_client.session.session
        return msg['parent_header'].get("session", session_id) == session_id
    
    def include_output(self, msg):
        """Return whether we should include a given output message"""
        if self._hidden:
            return False
        from_here = self.from_here(msg)
        if msg['msg_type'] == 'execute_input':
            # only echo inputs not from here
            return self.include_other_output and not from_here
        
        if self.include_other_output:
            return True
        else:
            return from_here