/usr/lib/python2.7/dist-packages/cylc/graphing.py is in python-cylc 7.6.0-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 | #!/usr/bin/env python
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2017 NIWA
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Cylc suite graphing module. Modules relying on this should test for
ImportError due to pygraphviz/graphviz not being installed."""
import pygraphviz
from cylc.cycling.loader import get_point, get_point_relative
from cylc.task_id import TaskID
class CGraphPlain(pygraphviz.AGraph):
"""Directed Acyclic Graph class for cylc dependency graphs."""
def __init__(self, title, suite_polling_tasks={}):
self.title = title
pygraphviz.AGraph.__init__(self, directed=True, strict=True)
# graph attributes
# - label (suite name)
self.graph_attr['label'] = title
self.suite_polling_tasks = suite_polling_tasks
def node_attr_by_taskname(self, node_string):
try:
name = TaskID.split(node_string)[0]
except ValueError:
# Special node?
if node_string.startswith("__remove_"):
return []
raise
if name in self.task_attr:
return self.task_attr[name]
else:
return []
def style_edge(self, left, right):
pass
def style_node(self, node_string):
node = self.get_node(node_string)
try:
name, point_string = TaskID.split(node_string)
except ValueError:
# Special node?
if node_string.startswith("__remove_"):
node.attr['style'] = 'dashed'
node.attr['label'] = u'\u2702'
return
raise
label = name
if name in self.suite_polling_tasks:
label += "\\n" + self.suite_polling_tasks[name][3]
label += "\\n" + point_string
node.attr['label'] = label
node.attr['URL'] = node_string
def cylc_remove_nodes_from(self, nodes):
"""Remove nodes, returning extra edge structure if possible.
Each group of connected to-be-removed nodes is replaced by a
single special node to preserve dependency info between the
remaining nodes.
"""
if not nodes:
return
existing_nodes = set(self.nodes())
remove_nodes = set(nodes)
remove_node_groups = {}
groups = {}
group_new_nodes = {}
edges = self.edges()
incoming_remove_edges = []
outgoing_remove_edges = []
internal_remove_edges = []
for l_node, r_node in edges:
if l_node in remove_nodes:
if r_node in remove_nodes:
# This is an edge between connected nuke nodes.
internal_remove_edges.append((l_node, r_node))
else:
# This is an edge between nuke and normal nodes.
outgoing_remove_edges.append((l_node, r_node))
elif r_node in remove_nodes:
incoming_remove_edges.append((l_node, r_node))
if not outgoing_remove_edges:
# Preserving edges doesn't matter - ditch this whole set.
self.remove_nodes_from(nodes)
return
# Loop through all connected nuke nodes and group them up.
group_num = -1
for l_node, r_node in sorted(internal_remove_edges):
l_group = remove_node_groups.get(l_node)
r_group = remove_node_groups.get(r_node)
if l_group is None:
if r_group is None:
# Create a new group for l_node and r_node.
group_num += 1
groups[group_num] = set((l_node, r_node))
remove_node_groups[l_node] = group_num
remove_node_groups[r_node] = group_num
else:
# r_node already in a group, l_node not - add l_node.
groups[r_group].add(l_node)
remove_node_groups[l_node] = r_group
elif r_group is None:
# l_node already in a group, r_node not - add r_node.
groups[l_group].add(r_node)
remove_node_groups[r_node] = l_group
elif l_group != r_group:
# They are members of different groups - combine them.
for node in groups[r_group]:
remove_node_groups[node] = l_group
groups[l_group] = groups[l_group].union(groups[r_group])
groups.pop(r_group)
# Some nodes are their own group and don't have connections.
for node in nodes:
if node not in remove_node_groups:
# The node is its own group.
group_num += 1
groups[group_num] = set([node])
remove_node_groups[node] = group_num
# Consolidate all groups with the same in/out edges.
group_edges = {}
for l_node, r_node in incoming_remove_edges:
r_group = remove_node_groups[r_node]
group_edges.setdefault(r_group, [set(), set()])
group_edges[r_group][0].add(l_node)
for l_node, r_node in outgoing_remove_edges:
l_group = remove_node_groups[l_node]
group_edges.setdefault(l_group, [set(), set()])
group_edges[l_group][1].add(r_node)
for group1 in sorted(group_edges):
if group1 not in group_edges:
continue
for group2 in sorted(group_edges):
if (group1 != group2 and
group_edges[group1][0] == group_edges[group2][0] and
group_edges[group1][1] == group_edges[group2][1]):
# Both groups have the same incoming and outgoing edges.
for node in groups[group2]:
remove_node_groups[node] = group1
groups[group1] = groups[group1].union(groups[group2])
groups.pop(group2)
group_edges.pop(group2)
# Create a new node name for the group.
names = set()
index = -1
for group in sorted(groups):
index += 1
name = "__remove_%s__" % index
while name in existing_nodes or name in names:
index += 1
name = "__remove_%s__" % index
group_new_nodes[group] = name
names.add(name)
new_edges = set()
groups_have_outgoing = set()
for l_node, r_node in outgoing_remove_edges:
new_l_group = remove_node_groups[l_node]
new_l_node = group_new_nodes[new_l_group]
new_edges.add((new_l_node, r_node, True, False, False))
groups_have_outgoing.add(new_l_group)
for l_node, r_node in incoming_remove_edges:
new_r_group = remove_node_groups[r_node]
new_r_node = group_new_nodes[new_r_group]
if new_r_group not in groups_have_outgoing:
# Skip any groups that don't have edges on to normal nodes.
continue
new_edges.add((l_node, new_r_node, True, False, False))
self.remove_nodes_from(nodes)
self.add_edges(sorted(new_edges))
def add_edges(self, edges, ignore_suicide=False):
"""Add edges and nodes connected by the edges."""
for edge in sorted(edges):
left, right, skipped, suicide, conditional = edge
if left is None and right is None or suicide and ignore_suicide:
continue
if left is None:
pygraphviz.AGraph.add_node(self, right)
self.style_node(right)
elif right is None:
pygraphviz.AGraph.add_node(self, left)
self.style_node(left)
else:
attrs = {'penwidth': 2}
if skipped:
attrs.update({'style': 'dotted', 'arrowhead': 'oinv'})
elif conditional and suicide:
attrs.update({'style': 'dashed', 'arrowhead': 'odot'})
elif conditional:
attrs.update({'style': 'solid', 'arrowhead': 'onormal'})
elif suicide:
attrs.update({'style': 'dashed', 'arrowhead': 'dot'})
else:
attrs.update({'style': 'solid', 'arrowhead': 'normal'})
pygraphviz.AGraph.add_edge(self, left, right, **attrs)
self.style_node(left)
self.style_node(right)
self.style_edge(left, right)
def add_cycle_point_subgraphs(self, edges):
"""Draw nodes within cycle point groups (subgraphs)."""
point_string_id_map = {}
for edge_entry in edges:
for id_ in edge_entry[:2]:
if id_ is None:
continue
try:
point_string = TaskID.split(id_)[1]
except IndexError:
# Probably a special node - ignore it.
continue
point_string_id_map.setdefault(point_string, [])
point_string_id_map[point_string].append(id_)
for point_string, ids in point_string_id_map.items():
self.add_subgraph(
nbunch=ids, name="cluster_" + point_string,
label=point_string, fontsize=28, rank="max", style="dashed"
)
def add_subgraph(self, nbunch=None, name=None, **attr):
"""Return subgraph induced by nodes in nbunch.
Overrides (but does the same thing as) pygraphviz's
AGraph.add_subgraph method.
"""
name = name.encode(self.encoding)
handle = pygraphviz.graphviz.agsubg(
self.handle, name, 1)
subgraph = pygraphviz.AGraph(
handle=handle, name=name,
strict=self.strict, directed=self.directed,
**attr
)
nodes = self.prepare_nbunch(nbunch)
subgraph.add_nodes_from(nodes)
return subgraph
class CGraph(CGraphPlain):
"""Directed Acyclic Graph class for cylc dependency graphs.
This class automatically adds node and edge attributes
according to the suite.rc file visualization config."""
def __init__(self, title, suite_polling_tasks={}, vizconfig={}):
# suite.rc visualization config section
self.vizconfig = vizconfig
CGraphPlain.__init__(self, title, suite_polling_tasks)
# graph attributes
# - default node attributes
for item in vizconfig['default node attributes']:
attr, value = [val.strip() for val in item.split('=', 1)]
self.node_attr[attr] = value
# - default edge attributes
for item in vizconfig['default edge attributes']:
attr, value = [val.strip() for val in item.split('=', 1)]
self.edge_attr[attr] = value
# non-default node attributes by task name
# TODO - ERROR CHECKING FOR INVALID TASK NAME
self.task_attr = {}
for item in self.vizconfig['node attributes']:
if item in self.vizconfig['node groups']:
# item is a group of tasks
for task in self.vizconfig['node groups'][item]:
# for each task in the group
for attr in self.vizconfig['node attributes'][item]:
if task not in self.task_attr:
self.task_attr[task] = []
self.task_attr[task].append(attr)
else:
# item must be a task name
for attr in self.vizconfig['node attributes'][item]:
if item not in self.task_attr:
self.task_attr[item] = []
self.task_attr[item].append(attr)
def style_node(self, node_string):
super(self.__class__, self).style_node(node_string)
node = self.get_node(node_string)
node.attr['shape'] = 'ellipse' # Default shape.
for item in self.node_attr_by_taskname(node_string):
attr, value = [val.strip() for val in item.split('=', 1)]
node.attr[attr] = value
if self.vizconfig['use node color for labels']:
node.attr['fontcolor'] = node.attr['color']
node.attr['penwidth'] = self.vizconfig['node penwidth']
def style_edge(self, left, right):
super(self.__class__, self).style_edge(left, right)
left_node = self.get_node(left)
edge = self.get_edge(left, right)
if self.vizconfig['use node color for edges']:
edge.attr['color'] = left_node.attr['color']
elif self.vizconfig['use node fillcolor for edges']:
if left_node.attr['style'] == 'filled':
edge.attr['color'] = left_node.attr['fillcolor']
edge.attr['penwidth'] = self.vizconfig['edge penwidth']
@classmethod
def get_graph(
cls, suiterc, group_nodes=None, ungroup_nodes=None,
ungroup_recursive=False, group_all=False, ungroup_all=False,
ignore_suicide=False, subgraphs_on=False):
"""Return dependency graph."""
# Use visualization settings.
start_point_string = (
suiterc.cfg['visualization']['initial cycle point'])
# Use visualization settings in absence of final cycle point definition
# when not validating (stops slowdown of validation due to vis
# settings)
stop_point = None
vfcp = suiterc.cfg['visualization']['final cycle point']
if vfcp:
try:
stop_point = get_point_relative(
vfcp, get_point(start_point_string)).standardise()
except ValueError:
stop_point = get_point(vfcp).standardise()
if stop_point is not None:
if stop_point < get_point(start_point_string):
# Avoid a null graph.
stop_point_string = start_point_string
else:
stop_point_string = str(stop_point)
else:
stop_point_string = None
graph = cls(
suiterc.suite,
suiterc.suite_polling_tasks,
suiterc.cfg['visualization'])
gr_edges = suiterc.get_graph_raw(
start_point_string, stop_point_string,
group_nodes, ungroup_nodes, ungroup_recursive,
group_all, ungroup_all)
graph.add_edges(gr_edges, ignore_suicide)
if subgraphs_on:
graph.add_cycle_point_subgraphs(gr_edges)
return graph
|