/usr/lib/python3/dist-packages/Nagstamon/Helpers.py is in nagstamon 3.0.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 | # encoding: utf-8
# Nagstamon - Nagios status monitor for your desktop
# Copyright (C) 2008-2014 Henri Wahl <h.wahl@ifw-dresden.de> et al.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
import datetime
# import subprocess # not used
import re
import sys
import traceback
import os
import psutil
import getpass
import webbrowser
# import md5 for centreon url autologin encoding
from hashlib import md5
from Nagstamon.Config import conf
# queue.Queue() needs threading module which might be not such a good idea to be used
# because QThread is already in use
# get debug queue from nagstamon.py
# ##debug_queue = sys.modules['__main__'].debug_queue
# states needed for gravity comparison for notification and Generic.py
# STATES = ['UP', 'UNKNOWN', 'WARNING', 'CRITICAL', 'UNREACHABLE', 'DOWN']
STATES = ['UP', 'UNKNOWN', 'INFORMATION', 'WARNING', 'AVERAGE', 'HIGH', 'CRITICAL', 'DISASTER', 'UNREACHABLE', 'DOWN']
# sound at the moment is only available for these states
STATES_SOUND = ['WARNING', 'CRITICAL', 'DOWN']
def not_empty(x):
'''
tiny helper function for BeautifulSoup in server Generic.py to filter text elements
'''
return bool(x.replace(' ', '').strip())
def is_found_by_re(string, pattern, reverse):
"""
helper for context menu actions in context menu - hosts and services might be filtered out
also useful for services and hosts and status information
"""
pattern = re.compile(pattern)
if len(pattern.findall(string)) > 0:
if str(reverse) == "True":
return False
else:
return True
else:
if str(reverse) == "True":
return True
else:
return False
def host_is_filtered_out_by_re(host, conf=None):
"""
helper for applying RE filters in Generic.GetStatus()
"""
try:
if conf.re_host_enabled is True:
return is_found_by_re(host, conf.re_host_pattern, conf.re_host_reverse)
# if RE are disabled return True because host is not filtered
return False
except Exception:
traceback.print_exc(file=sys.stdout)
def ServiceIsFilteredOutByRE(service, conf=None):
"""
helper for applying RE filters in Generic.GetStatus()
"""
try:
if conf.re_service_enabled is True:
return is_found_by_re(service, conf.re_service_pattern, conf.re_service_reverse)
# if RE are disabled return True because host is not filtered
return False
except Exception:
traceback.print_exc(file=sys.stdout)
def StatusInformationIsFilteredOutByRE(status_information, conf=None):
"""
helper for applying RE filters in Generic.GetStatus()
"""
try:
if conf.re_status_information_enabled is True:
return is_found_by_re(status_information, conf.re_status_information_pattern, conf.re_status_information_reverse)
# if RE are disabled return True because host is not filtered
return False
except Exception:
traceback.print_exc(file=sys.stdout)
def CriticalityIsFilteredOutByRE(criticality, conf=None):
"""
helper for applying RE filters in Generic.GetStatus()
"""
try:
if conf.re_criticality_enabled is True:
return is_found_by_re(criticality, conf.re_criticality_pattern, conf.re_criticality_reverse)
# if RE are disabled return True because host is not filtered
return False
except Exception:
traceback.print_exc(file=sys.stdout)
def HumanReadableDurationFromSeconds(seconds):
"""
convert seconds given by Opsview to the form Nagios gives them
like 70d 3h 34m 34s
"""
timedelta = str(datetime.timedelta(seconds=int(seconds)))
try:
if timedelta.find("day") == -1:
hms = timedelta.split(":")
if len(hms) == 1:
return "%02ds" % (hms[0])
elif len(hms) == 2:
return "%02dm %02ds" % (hms[0], hms[1])
else:
return "%sh %02dm %02ds" % (hms[0], hms[1], hms[2])
else:
# waste is waste - does anyone need it?
days, waste, hms = str(timedelta).split(" ")
hms = hms.split(":")
return "%sd %sh %02dm %02ds" % (days, hms[0], hms[1], hms[2])
except Exception:
# in case of any error return seconds we got
return seconds
def HumanReadableDurationFromTimestamp(timestamp):
"""
Thruk server supplies timestamp of latest state change which
has to be subtracted from .now()
"""
try:
td = datetime.datetime.now() - datetime.datetime.fromtimestamp(int(timestamp))
h = int(td.seconds / 3600)
m = int(td.seconds % 3600 / 60)
s = int(td.seconds % 60)
if td.days > 0:
return "%sd %sh %02dm %02ds" % (td.days, h, m, s)
elif h > 0:
return "%sh %02dm %02ds" % (h, m, s)
elif m > 0:
return "%02dm %02ds" % (m, s)
else:
return "%02ds" % (s)
except Exception:
traceback.print_exc(file=sys.stdout)
# unified machine readable date might go back to module Actions
def MachineSortableDate(raw):
"""
Try to compute machine readable date for all types of monitor servers
"""
# dictionary for duration date string components
d = {'M': 0, 'w': 0, 'd': 0, 'h': 0, 'm': 0, 's': 0}
# if for some reason the value is empty/none make it compatible: 0s
if raw is None:
raw = '0s'
# Check_MK style - added new variants in 1.4.x, based on abbreviations with spaces :-(
if ('-' in raw and ':' in raw) or\
('sec' in raw or 'min' in raw or 'hrs' in raw or 'days' in raw or\
' s' in raw or ' m' in raw or ' h' in raw or ' d' in raw):
# check_mk has different formats - if duration takes too long it changes its scheme
if '-' in raw and ':' in raw:
datepart, timepart = raw.split(' ')
# need to convert years into months for later comparison
Y, M, D = datepart.split('-')
d['M'] = int(Y) * 12 + int(M)
d['d'] = int(D)
# time does not need to be changed
h, m, s = timepart.split(':')
d['h'], d['m'], d['s'] = int(h), int(m), int(s)
del datepart, timepart, Y, M, D, h, m, s
else:
# recalculate a timedelta of the given value
if 'sec' in raw or ' s' in raw:
d['s'] = raw.split(' ')[0].split('.')[0]
delta = datetime.datetime.now() - datetime.timedelta(seconds=int(d['s']))
elif 'min' in raw or ' m' in raw:
d['m'] = raw.split(' ')[0].split('.')[0]
delta = datetime.datetime.now() - datetime.timedelta(minutes=int(d['m']))
elif 'hrs' in raw or ' h' in raw:
d['h'] = raw.split(' ')[0]
delta = datetime.datetime.now() - datetime.timedelta(hours=int(d['h']))
elif 'days' in raw or ' d' in raw:
d['d'] = raw.split(' ')[0]
delta = datetime.datetime.now() - datetime.timedelta(days=int(d['d']))
else:
delta = datetime.datetime.now()
Y, M, d['d'], d['h'], d['m'], d['s'] = delta.strftime('%Y %m %d %H %M %S').split(' ')
# need to convert years into months for later comparison
d['M'] = int(Y) * 12 + int(M)
# int-ify d
for i in d:
# workaround to make values negative to fix Check_MK's different order
d[i] = -int(d[i])
else:
# strip and replace necessary for Nagios duration values,
# split components of duration into dictionary
for c in raw.strip().replace(' ', ' ').split(' '):
number, period = c[0:-1], c[-1]
# attempt to be more robust in case of https://github.com/HenriWahl/Nagstamon/issues/405
try:
d[period] = int(number)
except:
d[period] = 0
del number, period
# convert collected duration data components into seconds for being comparable
return(16934400 * d['M'] + 604800 * d['w'] + 86400 * d['d'] + 3600 * d['h'] + 60 * d['m'] + d['s'])
def MD5ify(string):
"""
makes something md5y of a given username or password for Centreon web interface access
"""
return md5(string).hexdigest()
def lock_config_folder(folder):
'''
Locks the config folder by writing a PID file into it.
The lock is relative to user name and system's boot time.
Returns True on success, False when lock failed
Return True too if there is any locking error - if no locking ins possible it might run as well
This is also the case if some setup uses the nagstamon.config directory which most probably
will be read-only
'''
pidFilePath = os.path.join(folder, 'nagstamon.pid')
try:
# Open the file for rw or create a new one if missing
if os.path.exists(pidFilePath):
mode = 'r+t'
else:
mode = 'wt'
with open(pidFilePath, mode, newline=None) as pidFile:
curPid = os.getpid()
curBootTime = int(psutil.boot_time())
curUserName = getpass.getuser().replace('@', '_').strip()
pid = None
bootTime = None
userName = None
if mode.startswith('r'):
try:
procInfo = pidFile.readline().strip().split('@')
pid = int(procInfo[0])
bootTime = int(procInfo[1])
userName = procInfo[2].strip()
except(ValueError, IndexError):
pass
if pid is not None and bootTime is not None and userName is not None:
# Found a pid stored in the pid file, check if its still running
if bootTime == curBootTime and userName == curUserName and psutil.pid_exists(pid):
return False
pidFile.seek(0)
pidFile.truncate()
pidFile.write('{}@{}@{}'.format(curPid, curBootTime, curUserName))
except Exception as err:
print(err)
return True
# the following functions are used for sorted() in sort_data_array()
def compare_host(item):
return(item.lower())
def compare_service(item):
return(item.lower())
def compare_status(item):
return(STATES.index(item))
def compare_last_check(item):
return(MachineSortableDate(item))
def compare_duration(item):
return(MachineSortableDate(item))
def compare_attempt(item):
return(item)
def compare_status_information(item):
return(item.lower())
def webbrowser_open(url):
"""
decide if default or custom browser is used for various tasks
used by almost all
"""
if conf.use_default_browser:
webbrowser.open(url)
else:
webbrowser.get('{0} %s &'.format(conf.custom_browser)).open(url)
# depending on column different functions have to be used
# 0 + 1 are column "Hosts", 1 + 2 are column "Service" due to extra font flag pictograms
SORT_COLUMNS_FUNCTIONS = {0: compare_host,
1: compare_host,
2: compare_service,
3: compare_service,
4: compare_status,
5: compare_last_check,
6: compare_duration,
7: compare_attempt,
8: compare_status_information,
9: compare_status_information}
|