/usr/share/pyshared/socketio/mixins.py is in python-socketio 0.3.6-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 | """
These are general-purpose Mixins -- for use with Namespaces -- that are
generally useful for most simple projects, e.g. Rooms, Broadcast.
You'll likely want to create your own Mixins.
"""
class RoomsMixin(object):
def __init__(self, *args, **kwargs):
super(RoomsMixin, self).__init__(*args, **kwargs)
if 'rooms' not in self.session:
self.session['rooms'] = set() # a set of simple strings
def join(self, room):
"""Lets a user join a room on a specific Namespace."""
self.session['rooms'].add(self._get_room_name(room))
def leave(self, room):
"""Lets a user leave a room on a specific Namespace."""
self.session['rooms'].remove(self._get_room_name(room))
def _get_room_name(self, room):
return self.ns_name + '_' + room
def emit_to_room(self, room, event, *args):
"""This is sent to all in the room (in this particular Namespace)"""
pkt = dict(type="event",
name=event,
args=args,
endpoint=self.ns_name)
room_name = self._get_room_name(room)
for sessid, socket in self.socket.server.sockets.iteritems():
if 'rooms' not in socket.session:
continue
if room_name in socket.session['rooms'] and self.socket != socket:
socket.send_packet(pkt)
class BroadcastMixin(object):
"""Mix in this class with your Namespace to have a broadcast event method.
Use it like this:
class MyNamespace(BaseNamespace, BroadcastMixin):
def on_chatmsg(self, event):
self.broadcast_event('chatmsg', event)
"""
def broadcast_event(self, event, *args):
"""
This is sent to all in the sockets in this particular Namespace,
including itself.
"""
pkt = dict(type="event",
name=event,
args=args,
endpoint=self.ns_name)
for sessid, socket in self.socket.server.sockets.iteritems():
socket.send_packet(pkt)
def broadcast_event_not_me(self, event, *args):
"""
This is sent to all in the sockets in this particular Namespace,
except itself.
"""
pkt = dict(type="event",
name=event,
args=args,
endpoint=self.ns_name)
for sessid, socket in self.socket.server.sockets.iteritems():
if socket is not self.socket:
socket.send_packet(pkt)
|