/usr/lib/python3/dist-packages/etcd_settings/utils.py is in python3-django-etcd-settings 0.1.13+dfsg-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | import copy
import datetime
import json
import logging
import os
import sys
from collections import Mapping
from functools import wraps
from threading import Thread
import six
from dateutil.parser import parse as parse_date
def attrs_to_dir(mod):
data = {}
for attr in dir(mod):
if attr == attr.upper():
data[attr] = getattr(mod, attr)
return data
def dict_rec_update(d, u):
"""Nested update of a dict, handy for overriding settings"""
# https://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth
for k, v in u.items():
if isinstance(v, Mapping):
r = dict_rec_update(d.get(k, {}), v)
d[k] = r
else:
d[k] = u[k]
return d
class Task(Thread):
"""
The Threaded object returned by the @threaded decorator below
"""
def __init__(self, method, *args, **kwargs):
super(Task, self).__init__()
self.method = method
self.args = args
self.kwargs = kwargs
self._result = None
self.__exc_info = None
def run(self):
try:
self._result = self.method(*self.args, **self.kwargs)
except:
self.__exc_info = sys.exc_info()
@property
def result(self):
self.join()
if self.__exc_info is not None:
six.reraise(*self.__exc_info)
return self._result
def threaded(function=None, daemon=False):
def wrapper_factory(func):
@wraps(func)
def get_thread(*args, **kwargs):
t = Task(func, *args, **kwargs)
if daemon:
t.daemon = True
t.start()
return t
return get_thread
if function:
return wrapper_factory(function)
else:
return wrapper_factory
class CustomJSONEncoder(json.JSONEncoder):
custom_type_key = '_custom_type'
custom_type_value_key = 'value'
DECODERS = {'datetime': parse_date}
def default(self, obj):
if isinstance(obj, datetime.datetime):
return {self.custom_type_key: 'datetime',
self.custom_type_value_key: obj.isoformat()}
else:
return super(CustomJSONEncoder, self).default(obj)
def custom_json_decoder_hook(obj):
ct = obj.get(CustomJSONEncoder.custom_type_key, None)
if ct is not None:
value = obj.get(CustomJSONEncoder.custom_type_value_key)
return CustomJSONEncoder.DECODERS[ct](value)
else:
return obj
def find_project_root(root_indicator='manage.py', current=os.getcwd()):
parent = os.path.dirname(current)
if root_indicator in os.listdir(current):
return current
elif current == parent:
# We are at '/' already!
raise IOError('Not found: {}'.format(root_indicator))
else:
return find_project_root(root_indicator, parent)
def copy_if_mutable(value):
"""
Copy function handling mutable values (only dicts and lists).
"""
if type(value) in (dict, list):
return copy.deepcopy(value)
return value
def byteify(input):
if isinstance(input, dict):
return {byteify(key): byteify(value) for key, value in input.items()}
elif isinstance(input, list):
return [byteify(element) for element in input]
elif (sys.version_info.major == 2) and (isinstance(input, unicode)):
return input.encode('utf-8')
else:
return input
class IgnoreMaxEtcdRetries(logging.Filter):
"""
Skip etcd.client MaxRetryError on timeout
"""
def __init__(self, name='etcd.client'):
super(IgnoreMaxEtcdRetries, self).__init__(name)
def filter(self, record):
msg = '{}'.format(record.args)
return not (
'MaxRetryError' in msg and
'Read timed out' in msg
)
|