/usr/lib/python3/dist-packages/jupyter_client/tests/utils.py is in python3-jupyter-client 4.4.0-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | """Testing utils for jupyter_client tests
"""
import os
pjoin = os.path.join
try:
from unittest.mock import patch
except ImportError:
from mock import patch
from ipython_genutils.tempdir import TemporaryDirectory
class test_env(object):
"""Set Jupyter path variables to a temporary directory
Useful as a context manager or with explicit start/stop
"""
def start(self):
self.test_dir = td = TemporaryDirectory()
self.env_patch = patch.dict(os.environ, {
'JUPYTER_CONFIG_DIR': pjoin(td.name, 'jupyter'),
'JUPYTER_DATA_DIR': pjoin(td.name, 'jupyter_data'),
'JUPYTER_RUNTIME_DIR': pjoin(td.name, 'jupyter_runtime'),
'IPYTHONDIR': pjoin(td.name, 'ipython'),
})
self.env_patch.start()
def stop(self):
self.env_patch.stop()
self.test_dir.cleanup()
def __enter__(self):
self.start()
return self.test_dir.name
def __exit__(self, *exc_info):
self.stop()
def execute(code='', kc=None, **kwargs):
"""wrapper for doing common steps for validating an execution request"""
from .test_message_spec import validate_message
if kc is None:
kc = KC
msg_id = kc.execute(code=code, **kwargs)
reply = kc.get_shell_msg(timeout=TIMEOUT)
validate_message(reply, 'execute_reply', msg_id)
busy = kc.get_iopub_msg(timeout=TIMEOUT)
validate_message(busy, 'status', msg_id)
nt.assert_equal(busy['content']['execution_state'], 'busy')
if not kwargs.get('silent'):
execute_input = kc.get_iopub_msg(timeout=TIMEOUT)
validate_message(execute_input, 'execute_input', msg_id)
nt.assert_equal(execute_input['content']['code'], code)
return msg_id, reply['content']
|