/usr/lib/python2.7/dist-packages/compose/utils.py is in docker-compose 1.5.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 | import codecs
import hashlib
import json
import json.decoder
import logging
import sys
from threading import Thread
import six
from docker.errors import APIError
from six.moves.queue import Empty
from six.moves.queue import Queue
log = logging.getLogger(__name__)
json_decoder = json.JSONDecoder()
def parallel_execute(objects, obj_callable, msg_index, msg):
"""
For a given list of objects, call the callable passing in the first
object we give it.
"""
stream = get_output_stream(sys.stdout)
lines = []
for obj in objects:
write_out_msg(stream, lines, msg_index(obj), msg)
q = Queue()
def inner_execute_function(an_callable, parameter, msg_index):
error = None
try:
result = an_callable(parameter)
except APIError as e:
error = e.explanation
result = "error"
except Exception as e:
error = e
result = 'unexpected_exception'
q.put((msg_index, result, error))
for an_object in objects:
t = Thread(
target=inner_execute_function,
args=(obj_callable, an_object, msg_index(an_object)),
)
t.daemon = True
t.start()
done = 0
errors = {}
total_to_execute = len(objects)
while done < total_to_execute:
try:
msg_index, result, error = q.get(timeout=1)
if result == 'unexpected_exception':
errors[msg_index] = result, error
if result == 'error':
errors[msg_index] = result, error
write_out_msg(stream, lines, msg_index, msg, status='error')
else:
write_out_msg(stream, lines, msg_index, msg)
done += 1
except Empty:
pass
if not errors:
return
stream.write("\n")
for msg_index, (result, error) in errors.items():
stream.write("ERROR: for {} {} \n".format(msg_index, error))
if result == 'unexpected_exception':
raise error
def get_output_stream(stream):
if six.PY3:
return stream
return codecs.getwriter('utf-8')(stream)
def stream_as_text(stream):
"""Given a stream of bytes or text, if any of the items in the stream
are bytes convert them to text.
This function can be removed once docker-py returns text streams instead
of byte streams.
"""
for data in stream:
if not isinstance(data, six.text_type):
data = data.decode('utf-8', 'replace')
yield data
def line_splitter(buffer, separator=u'\n'):
index = buffer.find(six.text_type(separator))
if index == -1:
return None
return buffer[:index + 1], buffer[index + 1:]
def split_buffer(stream, splitter=None, decoder=lambda a: a):
"""Given a generator which yields strings and a splitter function,
joins all input, splits on the separator and yields each chunk.
Unlike string.split(), each chunk includes the trailing
separator, except for the last one if none was found on the end
of the input.
"""
splitter = splitter or line_splitter
buffered = six.text_type('')
for data in stream_as_text(stream):
buffered += data
while True:
buffer_split = splitter(buffered)
if buffer_split is None:
break
item, buffered = buffer_split
yield item
if buffered:
yield decoder(buffered)
def json_splitter(buffer):
"""Attempt to parse a json object from a buffer. If there is at least one
object, return it and the rest of the buffer, otherwise return None.
"""
try:
obj, index = json_decoder.raw_decode(buffer)
rest = buffer[json.decoder.WHITESPACE.match(buffer, index).end():]
return obj, rest
except ValueError:
return None
def json_stream(stream):
"""Given a stream of text, return a stream of json objects.
This handles streams which are inconsistently buffered (some entries may
be newline delimited, and others are not).
"""
return split_buffer(stream, json_splitter, json_decoder.decode)
def write_out_msg(stream, lines, msg_index, msg, status="done"):
"""
Using special ANSI code characters we can write out the msg over the top of
a previous status message, if it exists.
"""
obj_index = msg_index
if msg_index in lines:
position = lines.index(obj_index)
diff = len(lines) - position
# move up
stream.write("%c[%dA" % (27, diff))
# erase
stream.write("%c[2K\r" % 27)
stream.write("{} {} ... {}\r".format(msg, obj_index, status))
# move back down
stream.write("%c[%dB" % (27, diff))
else:
diff = 0
lines.append(obj_index)
stream.write("{} {} ... \r\n".format(msg, obj_index))
stream.flush()
def json_hash(obj):
dump = json.dumps(obj, sort_keys=True, separators=(',', ':'))
h = hashlib.sha256()
h.update(dump.encode('utf8'))
return h.hexdigest()
|