/usr/lib/python3/dist-packages/imexam/util.py is in python3-imexam 0.8.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 | """Licensed under a 3-clause BSD style license - see LICENSE.rst."""
from __future__ import print_function, division
import os
import sys
import logging
import warnings
from astropy.io import fits
try:
import xpa
_have_xpa = True
except ImportError:
_have_xpa = False
from .version import version as __version__
# To guide any import *
__all__ = ["display_help", "set_logging"]
def find_ds9():
"""Find the local path to the DS9 executable"""
path = "ds9"
for dirname in os.getenv("PATH").split(":"):
possible = os.path.join(dirname, path)
if os.path.isfile(possible):
return possible
return None
def find_xpans():
"""Find the local path to the xpans executable"""
path = "xpans"
for dirname in os.getenv("PATH").split(":"):
possible = os.path.join(dirname, path)
if os.path.exists(possible):
return possible
return None
def list_active_ds9(verbose=True):
"""
Display and/or return information about the DS9 windows currently
registered with the XPA.
Parameters
----------
verbose : bool
If True, prints out all the information about what DS9 windows
are active.
Returns
-------
session_list : list
The list of sessions that have been registered. Each entry in the list
is a list containing the information that xpans yields. Typically the
fourth element in that tuple contains the actual target name.
Notes
-----
when I start a unix socket with connect() the xpa register isn't
seeing it when I call this function. I think because it's only
listening on the inet socket which starts by default in the OS.
That's if xpans is installed on the machine. Otherwise, no
nameserver is running at all. This helps with object cont
"""
session_dict = {}
# only run if XPA/xpans is installed on the machine
if find_xpans():
sessions = None
try:
sessions = xpa.get(b"xpans").decode().strip().split("\n")
if ((sessions is None or len(sessions) < 1) and verbose):
print("No active sessions")
else:
for line in sessions:
classn, name, access, ids, user = tuple(line.split())
session_dict[ids] = (name, user, classn, access)
if verbose:
for line in sessions:
print(line)
except xpa.XpaException:
print("No active sessions registered")
else:
print("XPA nameserver not installed or not on PATH, \
function unavailable")
return session_dict
def display_help():
"""Display RTD html help for the installed verison in a browser window."""
url = "http://imexam.readthedocs.io/"
try:
import webbrowser
# grab the version that's installed
if "dev" not in __version__:
url += "en/v{0:s}/".format(__version__)
webbrowser.open(url)
except ImportError:
warnings.warn(
"webbrowser module not installed, see {0:s} for help".format(url))
raise ImportError
def display_xpa_help():
"""Display help for XPA Acccess Points."""
url = "http://ds9.si.edu/doc/ref/xpa.html"
try:
import webbrowser
webbrowser.open(url)
except ImportError:
warnings.warn(
"webbrowser module not installed, see {0:s} for help".format(url))
raise ImportError
# Set up logging ability for the user
# consider making a private logging level for data retension
def set_logging(filename=None, on=True, level=logging.INFO):
"""Turn on or off logging to file or stdout.
Parameters
----------
filename: str, optional
name of the file for logging information
on: bool, optional
turn logging on or off, will close the file output
in order to turn off stdout logging, set the level
to logging.CRITICAL
level: logging, optional
the logging level to be recorded or displayed
"""
formatter = logging.Formatter('\n%(funcName)s \n%(message)s')
root = logging.getLogger(__name__)
root.setLevel(level)
stream_attached = False
file_attached = False
if on:
# Try to avoid adding duplicate file handlers
if len(root.handlers) > 0:
for handler in root.handlers:
if isinstance(handler, logging.StreamHandler):
stream_attached = True
handler.setLevel(level)
if isinstance(handler, logging.FileHandler):
file_attached = True
raise ValueError("File for logging already specified,\
turn off logging first.")
else:
# to prevent warning in unhandled contexts and messages to stderr
root.addHandler(logging.NullHandler())
if isinstance(filename, str) and not file_attached:
file_handler = logging.FileHandler(filename=filename,
mode='a',
delay=True)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
print("Saving imexam commands to {0:s}".format(repr(filename)))
root.addHandler(file_handler)
if not stream_attached:
# set the stdout stream handler
stdout_handler = logging.StreamHandler(stream=sys.stdout)
stdout_handler.setLevel(logging.INFO)
# stdout_handler.setFormatter(formatter)
root.addHandler(stdout_handler)
# turning the logging off to the file and set level on stream handler
else:
for handler in root.handlers:
# close the file logger
if isinstance(handler, logging.FileHandler):
handler.close()
root.removeHandler(handler)
# set stream logging level
if isinstance(handler, logging.StreamHandler):
handler.setLevel(level)
return root
def check_filetype(filename=None):
"""Check the file to see if it is a multi-extension FITS file
or a simple fits image where the data and header are stored in
the global unit.
Parameters
----------
filename: string
The name of the file to check
"""
log = logging.getLogger(__name__)
if filename is None:
raise ValueError("No filename provided")
else:
try:
mef_file = fits.getval(filename, ext=0, keyword='EXTEND')
except KeyError:
mef_file = False
except IOError:
log.warning("Problem opening file {0:s}".format(repr(filename)))
raise IOError("Error opening {0:s}".format(filename))
# double check for lying liars, should at least have 1 extension
# and XTENSION is a required keyword
if mef_file:
try:
fits.getval(filename, ext=1, keyword='XTENSION')
except (KeyError, IndexError):
mef_file = False
return mef_file
def verify_filename(filename=None, extver=None, extname=None):
"""Verify the filename exist and split it for extension information.
If the user has given an extension, extension name or some combination of
those, return the full filename, extension and version tuple.
Parameters
----------
filename: string
The name of the file to verify
extver: int
extsion number which corresponds to extname
extname: string
the name of the extension
"""
if filename is None:
print("No filename provided")
else:
try:
if "[" in filename:
splitstr = filename.split("[")
shortname = splitstr[0]
if "," in splitstr[1]:
extname = splitstr[1].split(",")[0]
extver = int(splitstr[1].split(",")[1][0])
else:
extver = int(filename.split("[")[1][0])
else:
shortname = os.path.abspath(filename)
except IndexError as e:
print("Exception: {0}".format(repr(e)))
raise IndexError
return shortname, extname, extver
|