/usr/lib/python3/dist-packages/UM/FlameProfiler.py is in python3-uranium 3.1.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 | # Uranium is released under the terms of the LGPLv3 or higher.
import time
import math
import os
import threading
from contextlib import contextmanager
import functools
from typing import List
from PyQt5.QtCore import pyqtSlot as pyqt5PyqtSlot
from UM.Logger import Logger
# A simple profiler which produces data suitable for viewing as a flame graph
# when using the Big Flame Graph plugin.
#
# An example of code which uses this profiling data is this Cura plugin:
# https://github.com/sedwards2009/cura-big-flame-graph
#
# Set the environment variable URANIUM_FLAME_PROFILER to something before
# starting the application to make the profiling code available.
def enabled():
return "URANIUM_FLAME_PROFILER" in os.environ
record_profile = False # Flag to keep track of whether we are recording data.
# Profiling data is build up of a tree of these kinds of nodes. Each node
# has a name, start time, end time, and a list of children nodes which are
# other functions/methods which were called by this function.
class _ProfileCallNode:
def __init__(self, name, line_number, start_time, end_time, children):
self.__name = name
self.__line_number = line_number
self.__start_time = start_time
self.__end_time = end_time
self.__children = children if children is not None else [] # type: List[_ProfileCallNode]
def getStartTime(self):
return self.__start_time
def getEndTime(self):
return self.__end_time
def getDuration(self):
return self.__end_time - self.__start_time
def toJSON(self, root=False):
if root:
return """
{
"c": {
"callStats": """ + self._plainToJSON() + """,
"sampleIterval": 1,
"objectName": "Cura",
"runTime": """ + str(self.getDuration()) + """,
"totalSamples": """ + str(self.getDuration()) + """
},
"version": "0.34"
}
"""
else:
return self._plainToJSON()
def _plainToJSON(self):
return '''{
"stack": [
"''' + self.__name + '''",
"Code: ''' + self.__name + '''",
''' + str(self.__line_number) + ''',
''' + str(self.getDuration()) + '''
],
"sampleCount": '''+ str(self.getDuration()) + ''',
"children": [
''' + ",\n".join( [kid.toJSON() for kid in self.__children]) + '''
]
}
'''
child_accu_stack = [ [] ] # type: List[List[_ProfileCallNode]]
clear_profile_requested = False
record_profile_requested = False
stop_record_profile_requested = False
## Fetch the accumulated profile data.
#
# \return \type{ProfileCallNode} or None if there is no data.
def getProfileData():
raw_profile_calls = child_accu_stack[0]
if len(raw_profile_calls) == 0:
return None
start_time = raw_profile_calls[0].getStartTime()
end_time = raw_profile_calls[-1].getEndTime()
fill_children = _fillInProfileSpaces(start_time, end_time, raw_profile_calls)
return _ProfileCallNode("", 0, start_time, end_time, fill_children)
## Erase any profile data.
def clearProfileData():
global clear_profile_requested
clear_profile_requested = True
## Start recording profile data.
def startRecordingProfileData():
global record_profile_requested
global stop_record_profile_requested
stop_record_profile_requested = False
record_profile_requested = True
## Stop recording profile data.
def stopRecordingProfileData():
global stop_record_profile_requested
stop_record_profile_requested = True
def _fillInProfileSpaces(start_time, end_time, profile_call_list):
result = []
time_counter = start_time
for profile_call in profile_call_list:
if secondsToMS(profile_call.getStartTime()) != secondsToMS(time_counter):
result.append(_ProfileCallNode("", 0, time_counter, profile_call.getStartTime(), []))
result.append(profile_call)
time_counter = profile_call.getEndTime()
if secondsToMS(time_counter) != secondsToMS(end_time):
result.append(_ProfileCallNode("", 0, time_counter, end_time, []))
return result
def secondsToMS(value):
return math.floor(value *1000)
## Profile a block of code.
#
# Use this context manager to wrap and profile a block of code.
# \param name \type{str} The name to use to identify this code in the profile report.
@contextmanager
def profileCall(name):
if enabled():
start_time = time.perf_counter()
child_accu_stack.append([])
yield
end_time = time.perf_counter()
child_values = child_accu_stack.pop()
if (end_time - start_time) > 0.001: # Filter out small durations (< 1ms)
call_stat = _ProfileCallNode(name, 0, start_time, end_time, _fillInProfileSpaces(start_time, end_time,
child_values))
child_accu_stack[-1].append(call_stat)
else:
yield
## Return whether we are recording profiling information.
#
# \return \type{bool} True if we are recording.
def isRecordingProfile():
global record_profile
return record_profile and threading.main_thread() is threading.current_thread()
def updateProfileConfig():
global child_accu_stack
global record_profile
# We can only update the active profiling config when we are not deeply nested inside profiled calls.
if len(child_accu_stack) <= 1:
global clear_profile_requested
if clear_profile_requested:
clear_profile_requested = False
child_accu_stack = [[]]
global record_profile_requested
if record_profile_requested:
record_profile_requested = False
record_profile = True
Logger.log('d', 'Starting record record_profile_requested')
global stop_record_profile_requested
if stop_record_profile_requested:
stop_record_profile_requested = False
record_profile = False
Logger.log('d', 'Stopping record stop_record_profile_requested')
## Decorator which can be manually applied to methods to record profiling information.
#
# \type{Callable}
def profile(function):
if enabled():
@functools.wraps(function)
def runIt(*args, ** kwargs):
if isRecordingProfile():
with profileCall(function.__qualname__):
return function(*args, ** kwargs)
else:
return function(*args, **kwargs)
return runIt
else:
return function
## Drop in replacement for PyQt5's pyqtSlot decorator which records profiling information.
#
# See the PyQt5 documentation for information about pyqtSlot.
def pyqtSlot(*args, **kwargs):
if enabled():
def wrapIt(function):
@functools.wraps(function)
def wrapped(*args2, **kwargs2):
if isRecordingProfile():
with profileCall("[SLOT] "+ function.__qualname__):
return function(*args2, **kwargs2)
else:
return function(*args2, **kwargs2)
return pyqt5PyqtSlot(*args, **kwargs)(wrapped)
return wrapIt
else:
def dontWrapIt(function):
return pyqt5PyqtSlot(*args, **kwargs)(function)
return dontWrapIt
|