/usr/lib/python3/dist-packages/jupyter_client/tests/test_client.py is in python3-jupyter-client 5.2.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | """Tests for the KernelClient"""
# Copyright (c) Jupyter Development Team.
# Distributed under the terms of the Modified BSD License.
import os
pjoin = os.path.join
from unittest import TestCase
from jupyter_client.kernelspec import KernelSpecManager, NoSuchKernel, NATIVE_KERNEL_NAME
from ..manager import start_new_kernel
from .utils import test_env
import pytest
from ipython_genutils.py3compat import string_types
from IPython.utils.capture import capture_output
TIMEOUT = 30
class TestKernelClient(TestCase):
def setUp(self):
self.env_patch = test_env()
self.env_patch.start()
self.addCleanup(self.env_patch.stop)
try:
KernelSpecManager().get_kernel_spec(NATIVE_KERNEL_NAME)
except NoSuchKernel:
pytest.skip()
self.km, self.kc = start_new_kernel(kernel_name=NATIVE_KERNEL_NAME)
self.addCleanup(self.kc.stop_channels)
self.addCleanup(self.km.shutdown_kernel)
def test_execute_interactive(self):
kc = self.kc
with capture_output() as io:
reply = kc.execute_interactive("print('hello')", timeout=TIMEOUT)
assert 'hello' in io.stdout
assert reply['content']['status'] == 'ok'
def _check_reply(self, reply_type, reply):
self.assertIsInstance(reply, dict)
self.assertEqual(reply['header']['msg_type'], reply_type + '_reply')
self.assertEqual(reply['parent_header']['msg_type'], reply_type + '_request')
def test_history(self):
kc = self.kc
msg_id = kc.history(session=0)
self.assertIsInstance(msg_id, string_types)
reply = kc.history(session=0, reply=True, timeout=TIMEOUT)
self._check_reply('history', reply)
def test_inspect(self):
kc = self.kc
msg_id = kc.inspect('who cares')
self.assertIsInstance(msg_id, string_types)
reply = kc.inspect('code', reply=True, timeout=TIMEOUT)
self._check_reply('inspect', reply)
def test_complete(self):
kc = self.kc
msg_id = kc.complete('who cares')
self.assertIsInstance(msg_id, string_types)
reply = kc.complete('code', reply=True, timeout=TIMEOUT)
self._check_reply('complete', reply)
def test_kernel_info(self):
kc = self.kc
msg_id = kc.kernel_info()
self.assertIsInstance(msg_id, string_types)
reply = kc.kernel_info(reply=True, timeout=TIMEOUT)
self._check_reply('kernel_info', reply)
def test_comm_info(self):
kc = self.kc
msg_id = kc.comm_info()
self.assertIsInstance(msg_id, string_types)
reply = kc.comm_info(reply=True, timeout=TIMEOUT)
self._check_reply('comm_info', reply)
def test_shutdown(self):
kc = self.kc
msg_id = kc.shutdown()
self.assertIsInstance(msg_id, string_types)
reply = kc.shutdown(reply=True, timeout=TIMEOUT)
self._check_reply('shutdown', reply)
|