/usr/share/pyshared/igraph/remote/gephi.py is in python-igraph 0.6.5-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 | # vim:ts=4:sw=4:sts=4:et
# -*- coding: utf-8 -*-
"""Classes that help igraph communicate with Gephi (http://www.gephi.org)."""
from igraph.compat import property
import urllib2
try:
# JSON is optional so we don't blow up with Python < 2.6
import json
except ImportError:
try:
# Try with simplejson for Python < 2.6
import simplejson as json
except ImportError:
# No simplejson either
from igraph.drawing.utils import FakeModule
json = FakeModule()
__all__ = ["GephiConnection", "GephiGraphStreamer", "GephiGraphStreamingAPIFormat"]
__docformat__ = "restructuredtext en"
__license__ = u"""\
Copyright (C) 2006-2012 Tamás Nepusz <ntamas@gmail.com>
Pázmány Péter sétány 1/a, 1117 Budapest, Hungary
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
02110-1301 USA
"""
class GephiConnection(object):
"""Object that represents a connection to a Gephi master server."""
def __init__(self, url=None, host="127.0.0.1", port=8080, workspace=0):
"""Constructs a connection to a Gephi master server.
The connection object can be constructed either by specifying the `url`
directly, or by specifying the `host`, `port` and `workspace` arguments.
The latter three are evaluated only if `url` is None; otherwise the
`url` will take precedence.
The `url` argument does not have to include the operation (e.g.,
``?operation=updateGraph``); the connection will take care of it.
E.g., if you wish to connect to workspace 2 in a local Gephi instance on
port 7341, the correct form to use for the `url` is as follows::
http://localhost:7341/workspace0
"""
self._pending_operations = []
self._autoflush_threshold = 1024
self.url = url or self._construct_default_url(host, port, workspace)
def __del__(self):
try:
self.close()
except urllib2.URLError:
# Happens when Gephi is closed before the connection is destroyed
pass
def _construct_default_url(self, host, port, workspace):
return "http://%s:%d/workspace%d" % (host, port, workspace)
def close(self):
"""Flushes all the pending operations to the Gephi master server in a
single request."""
self.flush()
def flush(self):
"""Flushes all the pending operations to the Gephi master server in a
single request."""
data = "".join(self._pending_operations)
self._pending_operations = []
conn = urllib2.urlopen(self._update_url, data=data)
return conn.read()
@property
def url(self):
"""The URL of the Gephi workspace where the data will be sent."""
return self._url_root
@url.setter
def url(self, value):
self._url_root = value
self._get_url = self._url_root + "?operation=getGraph"
self._update_url = self._url_root + "?operation=updateGraph"
def write(self, data):
"""Sends the given raw data to the Gephi streaming master server in an HTTP
POST request."""
self._pending_operations.append(data)
if len(self._pending_operations) >= self._autoflush_threshold:
self.flush()
def __repr__(self):
return "%s(url=%r)" % (self.__class__.__name__, self.url)
class GephiGraphStreamingAPIFormat(object):
"""Object that implements the Gephi graph streaming API format and returns
Python objects corresponding to the events defined in the API.
"""
def get_add_node_event(self, identifier, attributes={}):
"""Generates a Python object corresponding to the event that adds a node
with the given identifier and attributes in the Gephi graph streaming API.
Example::
>>> api = GephiGraphStreamingAPIFormat()
>>> api.get_add_node_event("spam")
{'an': {'spam': {}}}
>>> api.get_add_node_event("spam", dict(ham="eggs"))
{'an': {'spam': {'ham': 'eggs'}}}
"""
return {"an": {identifier: attributes}}
def get_add_edge_event(self, identifier, source, target, directed, attributes={}):
"""Generates a Python object corresponding to the event that adds an edge
with the given source, target, directednessr and attributes in the Gephi
graph streaming API.
"""
result = dict(attributes)
result["source"] = source
result["target"] = target
result["directed"] = bool(directed)
return {"ae": {identifier: result}}
def get_change_node_event(self, identifier, attributes):
"""Generates a Python object corresponding to the event that changes the
attributes of some node in the Gephi graph streaming API. The given attributes
are merged into the existing ones; use C{None} as the attribute value to
delete a given attribute.
Example::
>>> api = GephiGraphStreamingAPIFormat()
>>> api.get_change_node_event("spam", dict(ham="eggs"))
{'cn': {'spam': {'ham': 'eggs'}}}
>>> api.get_change_node_event("spam", dict(ham=None))
{'cn': {'spam': {'ham': None}}}
"""
return {"cn": {identifier: attributes}}
def get_change_edge_event(self, identifier, attributes):
"""Generates a Python object corresponding to the event that changes the
attributes of some edge in the Gephi graph streaming API. The given attributes
are merged into the existing ones; use C{None} as the attribute value to
delete a given attribute.
Example::
>>> api = GephiGraphStreamingAPIFormat()
>>> api.get_change_edge_event("spam", dict(ham="eggs"))
{'ce': {'spam': {'ham': 'eggs'}}}
>>> api.get_change_edge_event("spam", dict(ham=None))
{'ce': {'spam': {'ham': None}}}
"""
return {"ce": {identifier: attributes}}
def get_delete_node_event(self, identifier):
"""Generates a Python object corresponding to the event that deletes a
node with the given identifier in the Gephi graph streaming API.
Example::
>>> api = GephiGraphStreamingAPIFormat()
>>> api.get_delete_node_event("spam")
{'dn': {'spam': {}}}
"""
return {"dn": {identifier: {}}}
def get_delete_edge_event(self, identifier):
"""Generates a Python object corresponding to the event that deletes an
edge with the given identifier in the Gephi graph streaming API.
Example::
>>> api = GephiGraphStreamingAPIFormat()
>>> api.get_delete_edge_event("spam:ham")
{'de': {'spam:ham': {}}}
"""
return {"de": {identifier: {}}}
class GephiGraphStreamer(object):
"""Class that produces JSON event objects that stream an igraph graph to
Gephi using the Gephi Graph Streaming API.
The Gephi graph streaming format is a simple JSON-based format that can be used
to post mutations to a graph (i.e. node and edge additions, removals and updates)
to a remote component. For instance, one can open up Gephi (http://www.gephi.org}),
install the Gephi graph streaming plugin and then send a graph from igraph
straight into the Gephi window by using `GephiGraphStreamer` with the
appropriate URL where Gephi is listening.
Example::
>>> from cStringIO import StringIO
>>> from igraph import Graph
>>> buf = StringIO()
>>> streamer = GephiGraphStreamer()
>>> graph = Graph.Formula("A --> B, B --> C")
>>> streamer.post(graph, buf)
>>> print buf.getvalue() # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
{"an": {"igraph:...:v:0": {"name": "A"}}}
{"an": {"igraph:...:v:1": {"name": "B"}}}
{"an": {"igraph:...:v:2": {"name": "C"}}}
{"ae": {"igraph:...:e:0:1": {...}}}
{"ae": {"igraph:...:e:1:2": {...}}}
<BLANKLINE>
"""
def __init__(self, encoder=None):
"""Constructs a Gephi graph streamer that will post graphs to a
given file-like object or a Gephi connection.
`encoder` must either be ``None`` or an instance of ``json.JSONEncoder``
and it must contain the JSON encoder to be used when posting JSON objects.
"""
self.encoder = encoder or json.JSONEncoder(ensure_ascii=True)
self.format = GephiGraphStreamingAPIFormat()
def iterjsonobj(self, graph):
"""Iterates over the JSON objects that build up the graph using the Gephi
graph streaming API. The objects returned from this function are Python
objects; they must be formatted with ``json.dumps`` before sending them
to the destination."""
# Construct a unique ID prefix
id_prefix = "igraph:%s" % (hex(id(graph)), )
# Add the vertices
add_node = self.format.get_add_node_event
for vertex in graph.vs:
yield add_node("%s:v:%d" % (id_prefix, vertex.index), vertex.attributes())
# Add the edges
add_edge = self.format.get_add_edge_event
directed = graph.is_directed()
for edge in graph.es:
yield add_edge("%s:e:%d:%d" % (id_prefix, edge.source, edge.target),
"%s:v:%d" % (id_prefix, edge.source),
"%s:v:%d" % (id_prefix, edge.target),
directed, edge.attributes())
def post(self, graph, destination, encoder=None):
"""Posts the given graph to the destination of the streamer using the
given JSON encoder. When `encoder` is ``None``, it falls back to the default
JSON encoder of the streamer in the `encoder` property.
`destination` must be a file-like object or an instance of `GephiConnection`.
"""
encoder = encoder or self.encoder
for jsonobj in self.iterjsonobj(graph):
self.send_event(jsonobj, destination, encoder=encoder, flush=False)
destination.flush()
def send_event(self, event, destination, encoder=None, flush=True):
"""Sends a single JSON event to the given destination using the given
JSON encoder. When `encoder` is ``None``, it falls back to the default
JSON encoder of the streamer in the `encoder` property.
`destination` must be a file-like object or an instance of `GephiConnection`.
The method flushes the destination after sending the event. If you want to
avoid this (e.g., because you are sending many events), set `flush` to
``False``.
"""
encoder = encoder or self.encoder
destination.write(encoder.encode(event))
destination.write("\r\n")
if flush:
destination.flush()
|