/usr/lib/python3/dist-packages/engineio/packet.py is in python3-engineio 1.6.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | import base64
import json as _json
import six
(OPEN, CLOSE, PING, PONG, MESSAGE, UPGRADE, NOOP) = (0, 1, 2, 3, 4, 5, 6)
packet_names = ['OPEN', 'CLOSE', 'PING', 'PONG', 'MESSAGE', 'UPGRADE', 'NOOP']
binary_types = (six.binary_type, bytearray)
class Packet(object):
"""Engine.IO packet."""
json = _json
def __init__(self, packet_type=NOOP, data=None, binary=None,
encoded_packet=None):
self.packet_type = packet_type
self.data = data
if binary is not None:
self.binary = binary
elif isinstance(data, six.text_type):
self.binary = False
elif isinstance(data, binary_types):
self.binary = True
else:
self.binary = False
if encoded_packet:
self.decode(encoded_packet)
def encode(self, b64=False, always_bytes=True):
"""Encode the packet for transmission."""
if self.binary and not b64:
encoded_packet = six.int2byte(self.packet_type)
else:
encoded_packet = six.text_type(self.packet_type)
if self.binary and b64:
encoded_packet = 'b' + encoded_packet
if self.binary:
if b64:
encoded_packet += base64.b64encode(self.data).decode('utf-8')
else:
encoded_packet += self.data
elif isinstance(self.data, six.string_types):
encoded_packet += self.data
elif isinstance(self.data, dict) or isinstance(self.data, list):
encoded_packet += self.json.dumps(self.data,
separators=(',', ':'))
elif self.data is not None:
encoded_packet += str(self.data)
if always_bytes and not isinstance(encoded_packet, binary_types):
encoded_packet = encoded_packet.encode('utf-8')
return encoded_packet
def decode(self, encoded_packet):
"""Decode a transmitted package."""
b64 = False
if not isinstance(encoded_packet, binary_types):
encoded_packet = encoded_packet.encode('utf-8')
elif not isinstance(encoded_packet, bytes):
encoded_packet = bytes(encoded_packet)
self.packet_type = six.byte2int(encoded_packet[0:1])
if self.packet_type == 98: # 'b' --> binary base64 encoded packet
self.binary = True
encoded_packet = encoded_packet[1:]
self.packet_type = six.byte2int(encoded_packet[0:1])
self.packet_type -= 48
b64 = True
elif self.packet_type >= 48:
self.packet_type -= 48
self.binary = False
else:
self.binary = True
self.data = None
if len(encoded_packet) > 1:
if self.binary:
if b64:
self.data = base64.b64decode(encoded_packet[1:])
else:
self.data = encoded_packet[1:]
else:
try:
self.data = self.json.loads(
encoded_packet[1:].decode('utf-8'))
except ValueError:
self.data = encoded_packet[1:].decode('utf-8')
|