/usr/share/pyshared/mod_pywebsocket/mux.py is in python-mod-pywebsocket 0.7.9-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 | # Copyright 2012, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""This file provides classes and helper functions for multiplexing extension.
Specification:
http://tools.ietf.org/html/draft-ietf-hybi-websocket-multiplexing-06
"""
import collections
import copy
import email
import email.parser
import logging
import math
import struct
import threading
import traceback
from mod_pywebsocket import common
from mod_pywebsocket import handshake
from mod_pywebsocket import util
from mod_pywebsocket._stream_base import BadOperationException
from mod_pywebsocket._stream_base import ConnectionTerminatedException
from mod_pywebsocket._stream_base import InvalidFrameException
from mod_pywebsocket._stream_hybi import Frame
from mod_pywebsocket._stream_hybi import Stream
from mod_pywebsocket._stream_hybi import StreamOptions
from mod_pywebsocket._stream_hybi import create_binary_frame
from mod_pywebsocket._stream_hybi import create_closing_handshake_body
from mod_pywebsocket._stream_hybi import create_header
from mod_pywebsocket._stream_hybi import create_length_header
from mod_pywebsocket._stream_hybi import parse_frame
from mod_pywebsocket.handshake import hybi
_CONTROL_CHANNEL_ID = 0
_DEFAULT_CHANNEL_ID = 1
_MUX_OPCODE_ADD_CHANNEL_REQUEST = 0
_MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1
_MUX_OPCODE_FLOW_CONTROL = 2
_MUX_OPCODE_DROP_CHANNEL = 3
_MUX_OPCODE_NEW_CHANNEL_SLOT = 4
_MAX_CHANNEL_ID = 2 ** 29 - 1
_INITIAL_NUMBER_OF_CHANNEL_SLOTS = 64
_INITIAL_QUOTA_FOR_CLIENT = 8 * 1024
_HANDSHAKE_ENCODING_IDENTITY = 0
_HANDSHAKE_ENCODING_DELTA = 1
# We need only these status code for now.
_HTTP_BAD_RESPONSE_MESSAGES = {
common.HTTP_STATUS_BAD_REQUEST: 'Bad Request',
}
# DropChannel reason code
# TODO(bashi): Define all reason code defined in -05 draft.
_DROP_CODE_NORMAL_CLOSURE = 1000
_DROP_CODE_INVALID_ENCAPSULATING_MESSAGE = 2001
_DROP_CODE_CHANNEL_ID_TRUNCATED = 2002
_DROP_CODE_ENCAPSULATED_FRAME_IS_TRUNCATED = 2003
_DROP_CODE_UNKNOWN_MUX_OPCODE = 2004
_DROP_CODE_INVALID_MUX_CONTROL_BLOCK = 2005
_DROP_CODE_CHANNEL_ALREADY_EXISTS = 2006
_DROP_CODE_NEW_CHANNEL_SLOT_VIOLATION = 2007
_DROP_CODE_UNKNOWN_REQUEST_ENCODING = 2010
_DROP_CODE_SEND_QUOTA_VIOLATION = 3005
_DROP_CODE_SEND_QUOTA_OVERFLOW = 3006
_DROP_CODE_ACKNOWLEDGED = 3008
_DROP_CODE_BAD_FRAGMENTATION = 3009
class MuxUnexpectedException(Exception):
"""Exception in handling multiplexing extension."""
pass
# Temporary
class MuxNotImplementedException(Exception):
"""Raised when a flow enters unimplemented code path."""
pass
class LogicalConnectionClosedException(Exception):
"""Raised when logical connection is gracefully closed."""
pass
class PhysicalConnectionError(Exception):
"""Raised when there is a physical connection error."""
def __init__(self, drop_code, message=''):
super(PhysicalConnectionError, self).__init__(
'code=%d, message=%r' % (drop_code, message))
self.drop_code = drop_code
self.message = message
class LogicalChannelError(Exception):
"""Raised when there is a logical channel error."""
def __init__(self, channel_id, drop_code, message=''):
super(LogicalChannelError, self).__init__(
'channel_id=%d, code=%d, message=%r' % (
channel_id, drop_code, message))
self.channel_id = channel_id
self.drop_code = drop_code
self.message = message
def _encode_channel_id(channel_id):
if channel_id < 0:
raise ValueError('Channel id %d must not be negative' % channel_id)
if channel_id < 2 ** 7:
return chr(channel_id)
if channel_id < 2 ** 14:
return struct.pack('!H', 0x8000 + channel_id)
if channel_id < 2 ** 21:
first = chr(0xc0 + (channel_id >> 16))
return first + struct.pack('!H', channel_id & 0xffff)
if channel_id < 2 ** 29:
return struct.pack('!L', 0xe0000000 + channel_id)
raise ValueError('Channel id %d is too large' % channel_id)
def _encode_number(number):
return create_length_header(number, False)
def _create_add_channel_response(channel_id, encoded_handshake,
encoding=0, rejected=False):
if encoding != 0 and encoding != 1:
raise ValueError('Invalid encoding %d' % encoding)
first_byte = ((_MUX_OPCODE_ADD_CHANNEL_RESPONSE << 5) |
(rejected << 4) | encoding)
block = (chr(first_byte) +
_encode_channel_id(channel_id) +
_encode_number(len(encoded_handshake)) +
encoded_handshake)
return block
def _create_drop_channel(channel_id, code=None, message=''):
if len(message) > 0 and code is None:
raise ValueError('Code must be specified if message is specified')
first_byte = _MUX_OPCODE_DROP_CHANNEL << 5
block = chr(first_byte) + _encode_channel_id(channel_id)
if code is None:
block += _encode_number(0) # Reason size
else:
reason = struct.pack('!H', code) + message
reason_size = _encode_number(len(reason))
block += reason_size + reason
return block
def _create_flow_control(channel_id, replenished_quota):
first_byte = _MUX_OPCODE_FLOW_CONTROL << 5
block = (chr(first_byte) +
_encode_channel_id(channel_id) +
_encode_number(replenished_quota))
return block
def _create_new_channel_slot(slots, send_quota):
if slots < 0 or send_quota < 0:
raise ValueError('slots and send_quota must be non-negative.')
first_byte = _MUX_OPCODE_NEW_CHANNEL_SLOT << 5
block = (chr(first_byte) +
_encode_number(slots) +
_encode_number(send_quota))
return block
def _create_fallback_new_channel_slot():
first_byte = (_MUX_OPCODE_NEW_CHANNEL_SLOT << 5) | 1 # Set the F flag
block = (chr(first_byte) + _encode_number(0) + _encode_number(0))
return block
def _parse_request_text(request_text):
request_line, header_lines = request_text.split('\r\n', 1)
words = request_line.split(' ')
if len(words) != 3:
raise ValueError('Bad Request-Line syntax %r' % request_line)
[command, path, version] = words
if version != 'HTTP/1.1':
raise ValueError('Bad request version %r' % version)
# email.parser.Parser() parses RFC 2822 (RFC 822) style headers.
# RFC 6455 refers RFC 2616 for handshake parsing, and RFC 2616 refers
# RFC 822.
headers = email.parser.Parser().parsestr(header_lines)
return command, path, version, headers
class _ControlBlock(object):
"""A structure that holds parsing result of multiplexing control block.
Control block specific attributes will be added by _MuxFramePayloadParser.
(e.g. encoded_handshake will be added for AddChannelRequest and
AddChannelResponse)
"""
def __init__(self, opcode):
self.opcode = opcode
class _MuxFramePayloadParser(object):
"""A class that parses multiplexed frame payload."""
def __init__(self, payload):
self._data = payload
self._read_position = 0
self._logger = util.get_class_logger(self)
def read_channel_id(self):
"""Reads channel id.
Raises:
ValueError: when the payload doesn't contain
valid channel id.
"""
remaining_length = len(self._data) - self._read_position
pos = self._read_position
if remaining_length == 0:
raise ValueError('Invalid channel id format')
channel_id = ord(self._data[pos])
channel_id_length = 1
if channel_id & 0xe0 == 0xe0:
if remaining_length < 4:
raise ValueError('Invalid channel id format')
channel_id = struct.unpack('!L',
self._data[pos:pos+4])[0] & 0x1fffffff
channel_id_length = 4
elif channel_id & 0xc0 == 0xc0:
if remaining_length < 3:
raise ValueError('Invalid channel id format')
channel_id = (((channel_id & 0x1f) << 16) +
struct.unpack('!H', self._data[pos+1:pos+3])[0])
channel_id_length = 3
elif channel_id & 0x80 == 0x80:
if remaining_length < 2:
raise ValueError('Invalid channel id format')
channel_id = struct.unpack('!H',
self._data[pos:pos+2])[0] & 0x3fff
channel_id_length = 2
self._read_position += channel_id_length
return channel_id
def read_inner_frame(self):
"""Reads an inner frame.
Raises:
PhysicalConnectionError: when the inner frame is invalid.
"""
if len(self._data) == self._read_position:
raise PhysicalConnectionError(
_DROP_CODE_ENCAPSULATED_FRAME_IS_TRUNCATED)
bits = ord(self._data[self._read_position])
self._read_position += 1
fin = (bits & 0x80) == 0x80
rsv1 = (bits & 0x40) == 0x40
rsv2 = (bits & 0x20) == 0x20
rsv3 = (bits & 0x10) == 0x10
opcode = bits & 0xf
payload = self.remaining_data()
# Consume rest of the message which is payload data of the original
# frame.
self._read_position = len(self._data)
return fin, rsv1, rsv2, rsv3, opcode, payload
def _read_number(self):
if self._read_position + 1 > len(self._data):
raise ValueError(
'Cannot read the first byte of number field')
number = ord(self._data[self._read_position])
if number & 0x80 == 0x80:
raise ValueError(
'The most significant bit of the first byte of number should '
'be unset')
self._read_position += 1
pos = self._read_position
if number == 127:
if pos + 8 > len(self._data):
raise ValueError('Invalid number field')
self._read_position += 8
number = struct.unpack('!Q', self._data[pos:pos+8])[0]
if number > 0x7FFFFFFFFFFFFFFF:
raise ValueError('Encoded number(%d) >= 2^63' % number)
if number <= 0xFFFF:
raise ValueError(
'%d should not be encoded by 9 bytes encoding' % number)
return number
if number == 126:
if pos + 2 > len(self._data):
raise ValueError('Invalid number field')
self._read_position += 2
number = struct.unpack('!H', self._data[pos:pos+2])[0]
if number <= 125:
raise ValueError(
'%d should not be encoded by 3 bytes encoding' % number)
return number
def _read_size_and_contents(self):
"""Reads data that consists of followings:
- the size of the contents encoded the same way as payload length
of the WebSocket Protocol with 1 bit padding at the head.
- the contents.
"""
try:
size = self._read_number()
except ValueError, e:
raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
str(e))
pos = self._read_position
if pos + size > len(self._data):
raise PhysicalConnectionError(
_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
'Cannot read %d bytes data' % size)
self._read_position += size
return self._data[pos:pos+size]
def _read_add_channel_request(self, first_byte, control_block):
reserved = (first_byte >> 2) & 0x7
if reserved != 0:
raise PhysicalConnectionError(
_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
'Reserved bits must be unset')
# Invalid encoding will be handled by MuxHandler.
encoding = first_byte & 0x3
try:
control_block.channel_id = self.read_channel_id()
except ValueError, e:
raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK)
control_block.encoding = encoding
encoded_handshake = self._read_size_and_contents()
control_block.encoded_handshake = encoded_handshake
return control_block
def _read_add_channel_response(self, first_byte, control_block):
reserved = (first_byte >> 2) & 0x3
if reserved != 0:
raise PhysicalConnectionError(
_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
'Reserved bits must be unset')
control_block.accepted = (first_byte >> 4) & 1
control_block.encoding = first_byte & 0x3
try:
control_block.channel_id = self.read_channel_id()
except ValueError, e:
raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK)
control_block.encoded_handshake = self._read_size_and_contents()
return control_block
def _read_flow_control(self, first_byte, control_block):
reserved = first_byte & 0x1f
if reserved != 0:
raise PhysicalConnectionError(
_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
'Reserved bits must be unset')
try:
control_block.channel_id = self.read_channel_id()
control_block.send_quota = self._read_number()
except ValueError, e:
raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
str(e))
return control_block
def _read_drop_channel(self, first_byte, control_block):
reserved = first_byte & 0x1f
if reserved != 0:
raise PhysicalConnectionError(
_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
'Reserved bits must be unset')
try:
control_block.channel_id = self.read_channel_id()
except ValueError, e:
raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK)
reason = self._read_size_and_contents()
if len(reason) == 0:
control_block.drop_code = None
control_block.drop_message = ''
elif len(reason) >= 2:
control_block.drop_code = struct.unpack('!H', reason[:2])[0]
control_block.drop_message = reason[2:]
else:
raise PhysicalConnectionError(
_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
'Received DropChannel that conains only 1-byte reason')
return control_block
def _read_new_channel_slot(self, first_byte, control_block):
reserved = first_byte & 0x1e
if reserved != 0:
raise PhysicalConnectionError(
_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
'Reserved bits must be unset')
control_block.fallback = first_byte & 1
try:
control_block.slots = self._read_number()
control_block.send_quota = self._read_number()
except ValueError, e:
raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
str(e))
return control_block
def read_control_blocks(self):
"""Reads control block(s).
Raises:
PhysicalConnectionError: when the payload contains invalid control
block(s).
StopIteration: when no control blocks left.
"""
while self._read_position < len(self._data):
first_byte = ord(self._data[self._read_position])
self._read_position += 1
opcode = (first_byte >> 5) & 0x7
control_block = _ControlBlock(opcode=opcode)
if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST:
yield self._read_add_channel_request(first_byte, control_block)
elif opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
yield self._read_add_channel_response(
first_byte, control_block)
elif opcode == _MUX_OPCODE_FLOW_CONTROL:
yield self._read_flow_control(first_byte, control_block)
elif opcode == _MUX_OPCODE_DROP_CHANNEL:
yield self._read_drop_channel(first_byte, control_block)
elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
yield self._read_new_channel_slot(first_byte, control_block)
else:
raise PhysicalConnectionError(
_DROP_CODE_UNKNOWN_MUX_OPCODE,
'Invalid opcode %d' % opcode)
assert self._read_position == len(self._data)
raise StopIteration
def remaining_data(self):
"""Returns remaining data."""
return self._data[self._read_position:]
class _LogicalRequest(object):
"""Mimics mod_python request."""
def __init__(self, channel_id, command, path, protocol, headers,
connection):
"""Constructs an instance.
Args:
channel_id: the channel id of the logical channel.
command: HTTP request command.
path: HTTP request path.
headers: HTTP headers.
connection: _LogicalConnection instance.
"""
self.channel_id = channel_id
self.method = command
self.uri = path
self.protocol = protocol
self.headers_in = headers
self.connection = connection
self.server_terminated = False
self.client_terminated = False
def is_https(self):
"""Mimics request.is_https(). Returns False because this method is
used only by old protocols (hixie and hybi00).
"""
return False
class _LogicalConnection(object):
"""Mimics mod_python mp_conn."""
# For details, see the comment of set_read_state().
STATE_ACTIVE = 1
STATE_GRACEFULLY_CLOSED = 2
STATE_TERMINATED = 3
def __init__(self, mux_handler, channel_id):
"""Constructs an instance.
Args:
mux_handler: _MuxHandler instance.
channel_id: channel id of this connection.
"""
self._mux_handler = mux_handler
self._channel_id = channel_id
self._incoming_data = ''
# - Protects _waiting_write_completion
# - Signals the thread waiting for completion of write by mux handler
self._write_condition = threading.Condition()
self._waiting_write_completion = False
self._read_condition = threading.Condition()
self._read_state = self.STATE_ACTIVE
def get_local_addr(self):
"""Getter to mimic mp_conn.local_addr."""
return self._mux_handler.physical_connection.get_local_addr()
local_addr = property(get_local_addr)
def get_remote_addr(self):
"""Getter to mimic mp_conn.remote_addr."""
return self._mux_handler.physical_connection.get_remote_addr()
remote_addr = property(get_remote_addr)
def get_memorized_lines(self):
"""Gets memorized lines. Not supported."""
raise MuxUnexpectedException('_LogicalConnection does not support '
'get_memorized_lines')
def write(self, data):
"""Writes data. mux_handler sends data asynchronously. The caller will
be suspended until write done.
Args:
data: data to be written.
Raises:
MuxUnexpectedException: when called before finishing the previous
write.
"""
try:
self._write_condition.acquire()
if self._waiting_write_completion:
raise MuxUnexpectedException(
'Logical connection %d is already waiting the completion '
'of write' % self._channel_id)
self._waiting_write_completion = True
self._mux_handler.send_data(self._channel_id, data)
self._write_condition.wait()
# TODO(tyoshino): Raise an exception if woke up by on_writer_done.
finally:
self._write_condition.release()
def write_control_data(self, data):
"""Writes data via the control channel. Don't wait finishing write
because this method can be called by mux dispatcher.
Args:
data: data to be written.
"""
self._mux_handler.send_control_data(data)
def on_write_data_done(self):
"""Called when sending data is completed."""
try:
self._write_condition.acquire()
if not self._waiting_write_completion:
raise MuxUnexpectedException(
'Invalid call of on_write_data_done for logical '
'connection %d' % self._channel_id)
self._waiting_write_completion = False
self._write_condition.notify()
finally:
self._write_condition.release()
def on_writer_done(self):
"""Called by the mux handler when the writer thread has finished."""
try:
self._write_condition.acquire()
self._waiting_write_completion = False
self._write_condition.notify()
finally:
self._write_condition.release()
def append_frame_data(self, frame_data):
"""Appends incoming frame data. Called when mux_handler dispatches
frame data to the corresponding application.
Args:
frame_data: incoming frame data.
"""
self._read_condition.acquire()
self._incoming_data += frame_data
self._read_condition.notify()
self._read_condition.release()
def read(self, length):
"""Reads data. Blocks until enough data has arrived via physical
connection.
Args:
length: length of data to be read.
Raises:
LogicalConnectionClosedException: when closing handshake for this
logical channel has been received.
ConnectionTerminatedException: when the physical connection has
closed, or an error is caused on the reader thread.
"""
self._read_condition.acquire()
while (self._read_state == self.STATE_ACTIVE and
len(self._incoming_data) < length):
self._read_condition.wait()
try:
if self._read_state == self.STATE_GRACEFULLY_CLOSED:
raise LogicalConnectionClosedException(
'Logical channel %d has closed.' % self._channel_id)
elif self._read_state == self.STATE_TERMINATED:
raise ConnectionTerminatedException(
'Receiving %d byte failed. Logical channel (%d) closed' %
(length, self._channel_id))
value = self._incoming_data[:length]
self._incoming_data = self._incoming_data[length:]
finally:
self._read_condition.release()
return value
def set_read_state(self, new_state):
"""Sets the state of this connection. Called when an event for this
connection has occurred.
Args:
new_state: state to be set. new_state must be one of followings:
- STATE_GRACEFULLY_CLOSED: when closing handshake for this
connection has been received.
- STATE_TERMINATED: when the physical connection has closed or
DropChannel of this connection has received.
"""
self._read_condition.acquire()
self._read_state = new_state
self._read_condition.notify()
self._read_condition.release()
class _InnerMessage(object):
"""Holds the result of _InnerMessageBuilder.build().
"""
def __init__(self, opcode, payload):
self.opcode = opcode
self.payload = payload
class _InnerMessageBuilder(object):
"""A class that holds the context of inner message fragmentation and
builds a message from fragmented inner frame(s).
"""
def __init__(self):
self._control_opcode = None
self._pending_control_fragments = []
self._message_opcode = None
self._pending_message_fragments = []
self._frame_handler = self._handle_first
def _handle_first(self, frame):
if frame.opcode == common.OPCODE_CONTINUATION:
raise InvalidFrameException('Sending invalid continuation opcode')
if common.is_control_opcode(frame.opcode):
return self._process_first_fragmented_control(frame)
else:
return self._process_first_fragmented_message(frame)
def _process_first_fragmented_control(self, frame):
self._control_opcode = frame.opcode
self._pending_control_fragments.append(frame.payload)
if not frame.fin:
self._frame_handler = self._handle_fragmented_control
return None
return self._reassemble_fragmented_control()
def _process_first_fragmented_message(self, frame):
self._message_opcode = frame.opcode
self._pending_message_fragments.append(frame.payload)
if not frame.fin:
self._frame_handler = self._handle_fragmented_message
return None
return self._reassemble_fragmented_message()
def _handle_fragmented_control(self, frame):
if frame.opcode != common.OPCODE_CONTINUATION:
raise InvalidFrameException(
'Sending invalid opcode %d while sending fragmented control '
'message' % frame.opcode)
self._pending_control_fragments.append(frame.payload)
if not frame.fin:
return None
return self._reassemble_fragmented_control()
def _reassemble_fragmented_control(self):
opcode = self._control_opcode
payload = ''.join(self._pending_control_fragments)
self._control_opcode = None
self._pending_control_fragments = []
if self._message_opcode is not None:
self._frame_handler = self._handle_fragmented_message
else:
self._frame_handler = self._handle_first
return _InnerMessage(opcode, payload)
def _handle_fragmented_message(self, frame):
# Sender can interleave a control message while sending fragmented
# messages.
if common.is_control_opcode(frame.opcode):
if self._control_opcode is not None:
raise MuxUnexpectedException(
'Should not reach here(Bug in builder)')
return self._process_first_fragmented_control(frame)
if frame.opcode != common.OPCODE_CONTINUATION:
raise InvalidFrameException(
'Sending invalid opcode %d while sending fragmented message' %
frame.opcode)
self._pending_message_fragments.append(frame.payload)
if not frame.fin:
return None
return self._reassemble_fragmented_message()
def _reassemble_fragmented_message(self):
opcode = self._message_opcode
payload = ''.join(self._pending_message_fragments)
self._message_opcode = None
self._pending_message_fragments = []
self._frame_handler = self._handle_first
return _InnerMessage(opcode, payload)
def build(self, frame):
"""Build an inner message. Returns an _InnerMessage instance when
the given frame is the last fragmented frame. Returns None otherwise.
Args:
frame: an inner frame.
Raises:
InvalidFrameException: when received invalid opcode. (e.g.
receiving non continuation data opcode but the fin flag of
the previous inner frame was not set.)
"""
return self._frame_handler(frame)
class _LogicalStream(Stream):
"""Mimics the Stream class. This class interprets multiplexed WebSocket
frames.
"""
def __init__(self, request, stream_options, send_quota, receive_quota):
"""Constructs an instance.
Args:
request: _LogicalRequest instance.
stream_options: StreamOptions instance.
send_quota: Initial send quota.
receive_quota: Initial receive quota.
"""
# Physical stream is responsible for masking.
stream_options.unmask_receive = False
Stream.__init__(self, request, stream_options)
self._send_closed = False
self._send_quota = send_quota
# - Protects _send_closed and _send_quota
# - Signals the thread waiting for send quota replenished
self._send_condition = threading.Condition()
# The opcode of the first frame in messages.
self._message_opcode = common.OPCODE_TEXT
# True when the last message was fragmented.
self._last_message_was_fragmented = False
self._receive_quota = receive_quota
self._write_inner_frame_semaphore = threading.Semaphore()
self._inner_message_builder = _InnerMessageBuilder()
def _create_inner_frame(self, opcode, payload, end=True):
frame = Frame(fin=end, opcode=opcode, payload=payload)
for frame_filter in self._options.outgoing_frame_filters:
frame_filter.filter(frame)
if len(payload) != len(frame.payload):
raise MuxUnexpectedException(
'Mux extension must not be used after extensions which change '
' frame boundary')
first_byte = ((frame.fin << 7) | (frame.rsv1 << 6) |
(frame.rsv2 << 5) | (frame.rsv3 << 4) | frame.opcode)
return chr(first_byte) + frame.payload
def _write_inner_frame(self, opcode, payload, end=True):
payload_length = len(payload)
write_position = 0
try:
# An inner frame will be fragmented if there is no enough send
# quota. This semaphore ensures that fragmented inner frames are
# sent in order on the logical channel.
# Note that frames that come from other logical channels or
# multiplexing control blocks can be inserted between fragmented
# inner frames on the physical channel.
self._write_inner_frame_semaphore.acquire()
# Consume an octet quota when this is the first fragmented frame.
if opcode != common.OPCODE_CONTINUATION:
try:
self._send_condition.acquire()
while (not self._send_closed) and self._send_quota == 0:
self._send_condition.wait()
if self._send_closed:
raise BadOperationException(
'Logical connection %d is closed' %
self._request.channel_id)
self._send_quota -= 1
finally:
self._send_condition.release()
while write_position < payload_length:
try:
self._send_condition.acquire()
while (not self._send_closed) and self._send_quota == 0:
self._logger.debug(
'No quota. Waiting FlowControl message for %d.' %
self._request.channel_id)
self._send_condition.wait()
if self._send_closed:
raise BadOperationException(
'Logical connection %d is closed' %
self.request._channel_id)
remaining = payload_length - write_position
write_length = min(self._send_quota, remaining)
inner_frame_end = (
end and
(write_position + write_length == payload_length))
inner_frame = self._create_inner_frame(
opcode,
payload[write_position:write_position+write_length],
inner_frame_end)
self._send_quota -= write_length
self._logger.debug('Consumed quota=%d, remaining=%d' %
(write_length, self._send_quota))
finally:
self._send_condition.release()
# Writing data will block the worker so we need to release
# _send_condition before writing.
self._logger.debug('Sending inner frame: %r' % inner_frame)
self._request.connection.write(inner_frame)
write_position += write_length
opcode = common.OPCODE_CONTINUATION
except ValueError, e:
raise BadOperationException(e)
finally:
self._write_inner_frame_semaphore.release()
def replenish_send_quota(self, send_quota):
"""Replenish send quota."""
try:
self._send_condition.acquire()
if self._send_quota + send_quota > 0x7FFFFFFFFFFFFFFF:
self._send_quota = 0
raise LogicalChannelError(
self._request.channel_id, _DROP_CODE_SEND_QUOTA_OVERFLOW)
self._send_quota += send_quota
self._logger.debug('Replenished send quota for channel id %d: %d' %
(self._request.channel_id, self._send_quota))
finally:
self._send_condition.notify()
self._send_condition.release()
def consume_receive_quota(self, amount):
"""Consumes receive quota. Returns False on failure."""
if self._receive_quota < amount:
self._logger.debug('Violate quota on channel id %d: %d < %d' %
(self._request.channel_id,
self._receive_quota, amount))
return False
self._receive_quota -= amount
return True
def send_message(self, message, end=True, binary=False):
"""Override Stream.send_message."""
if self._request.server_terminated:
raise BadOperationException(
'Requested send_message after sending out a closing handshake')
if binary and isinstance(message, unicode):
raise BadOperationException(
'Message for binary frame must be instance of str')
if binary:
opcode = common.OPCODE_BINARY
else:
opcode = common.OPCODE_TEXT
message = message.encode('utf-8')
for message_filter in self._options.outgoing_message_filters:
message = message_filter.filter(message, end, binary)
if self._last_message_was_fragmented:
if opcode != self._message_opcode:
raise BadOperationException('Message types are different in '
'frames for the same message')
opcode = common.OPCODE_CONTINUATION
else:
self._message_opcode = opcode
self._write_inner_frame(opcode, message, end)
self._last_message_was_fragmented = not end
def _receive_frame(self):
"""Overrides Stream._receive_frame.
In addition to call Stream._receive_frame, this method adds the amount
of payload to receiving quota and sends FlowControl to the client.
We need to do it here because Stream.receive_message() handles
control frames internally.
"""
opcode, payload, fin, rsv1, rsv2, rsv3 = Stream._receive_frame(self)
amount = len(payload)
# Replenish extra one octet when receiving the first fragmented frame.
if opcode != common.OPCODE_CONTINUATION:
amount += 1
self._receive_quota += amount
frame_data = _create_flow_control(self._request.channel_id,
amount)
self._logger.debug('Sending flow control for %d, replenished=%d' %
(self._request.channel_id, amount))
self._request.connection.write_control_data(frame_data)
return opcode, payload, fin, rsv1, rsv2, rsv3
def _get_message_from_frame(self, frame):
"""Overrides Stream._get_message_from_frame.
"""
try:
inner_message = self._inner_message_builder.build(frame)
except InvalidFrameException:
raise LogicalChannelError(
self._request.channel_id, _DROP_CODE_BAD_FRAGMENTATION)
if inner_message is None:
return None
self._original_opcode = inner_message.opcode
return inner_message.payload
def receive_message(self):
"""Overrides Stream.receive_message."""
# Just call Stream.receive_message(), but catch
# LogicalConnectionClosedException, which is raised when the logical
# connection has closed gracefully.
try:
return Stream.receive_message(self)
except LogicalConnectionClosedException, e:
self._logger.debug('%s', e)
return None
def _send_closing_handshake(self, code, reason):
"""Overrides Stream._send_closing_handshake."""
body = create_closing_handshake_body(code, reason)
self._logger.debug('Sending closing handshake for %d: (%r, %r)' %
(self._request.channel_id, code, reason))
self._write_inner_frame(common.OPCODE_CLOSE, body, end=True)
self._request.server_terminated = True
def send_ping(self, body=''):
"""Overrides Stream.send_ping"""
self._logger.debug('Sending ping on logical channel %d: %r' %
(self._request.channel_id, body))
self._write_inner_frame(common.OPCODE_PING, body, end=True)
self._ping_queue.append(body)
def _send_pong(self, body):
"""Overrides Stream._send_pong"""
self._logger.debug('Sending pong on logical channel %d: %r' %
(self._request.channel_id, body))
self._write_inner_frame(common.OPCODE_PONG, body, end=True)
def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
"""Overrides Stream.close_connection."""
# TODO(bashi): Implement
self._logger.debug('Closing logical connection %d' %
self._request.channel_id)
self._request.server_terminated = True
def _drain_received_data(self):
"""Overrides Stream._drain_received_data. Nothing need to be done for
logical channel.
"""
pass
def stop_sending(self):
"""Stops accepting new send operation (_write_inner_frame)."""
self._send_condition.acquire()
self._send_closed = True
self._send_condition.notify()
self._send_condition.release()
class _OutgoingData(object):
"""A structure that holds data to be sent via physical connection and
origin of the data.
"""
def __init__(self, channel_id, data):
self.channel_id = channel_id
self.data = data
class _PhysicalConnectionWriter(threading.Thread):
"""A thread that is responsible for writing data to physical connection.
TODO(bashi): Make sure there is no thread-safety problem when the reader
thread reads data from the same socket at a time.
"""
def __init__(self, mux_handler):
"""Constructs an instance.
Args:
mux_handler: _MuxHandler instance.
"""
threading.Thread.__init__(self)
self._logger = util.get_class_logger(self)
self._mux_handler = mux_handler
self.setDaemon(True)
# When set, make this thread stop accepting new data, flush pending
# data and exit.
self._stop_requested = False
# The close code of the physical connection.
self._close_code = common.STATUS_NORMAL_CLOSURE
# Deque for passing write data. It's protected by _deque_condition
# until _stop_requested is set.
self._deque = collections.deque()
# - Protects _deque, _stop_requested and _close_code
# - Signals threads waiting for them to be available
self._deque_condition = threading.Condition()
def put_outgoing_data(self, data):
"""Puts outgoing data.
Args:
data: _OutgoingData instance.
Raises:
BadOperationException: when the thread has been requested to
terminate.
"""
try:
self._deque_condition.acquire()
if self._stop_requested:
raise BadOperationException('Cannot write data anymore')
self._deque.append(data)
self._deque_condition.notify()
finally:
self._deque_condition.release()
def _write_data(self, outgoing_data):
message = (_encode_channel_id(outgoing_data.channel_id) +
outgoing_data.data)
try:
self._mux_handler.physical_stream.send_message(
message=message, end=True, binary=True)
except Exception, e:
util.prepend_message_to_exception(
'Failed to send message to %r: ' %
(self._mux_handler.physical_connection.remote_addr,), e)
raise
# TODO(bashi): It would be better to block the thread that sends
# control data as well.
if outgoing_data.channel_id != _CONTROL_CHANNEL_ID:
self._mux_handler.notify_write_data_done(outgoing_data.channel_id)
def run(self):
try:
self._deque_condition.acquire()
while not self._stop_requested:
if len(self._deque) == 0:
self._deque_condition.wait()
continue
outgoing_data = self._deque.popleft()
self._deque_condition.release()
self._write_data(outgoing_data)
self._deque_condition.acquire()
# Flush deque.
#
# At this point, self._deque_condition is always acquired.
try:
while len(self._deque) > 0:
outgoing_data = self._deque.popleft()
self._write_data(outgoing_data)
finally:
self._deque_condition.release()
# Close physical connection.
try:
# Don't wait the response here. The response will be read
# by the reader thread.
self._mux_handler.physical_stream.close_connection(
self._close_code, wait_response=False)
except Exception, e:
util.prepend_message_to_exception(
'Failed to close the physical connection: %r' % e)
raise
finally:
self._mux_handler.notify_writer_done()
def stop(self, close_code=common.STATUS_NORMAL_CLOSURE):
"""Stops the writer thread."""
self._deque_condition.acquire()
self._stop_requested = True
self._close_code = close_code
self._deque_condition.notify()
self._deque_condition.release()
class _PhysicalConnectionReader(threading.Thread):
"""A thread that is responsible for reading data from physical connection.
"""
def __init__(self, mux_handler):
"""Constructs an instance.
Args:
mux_handler: _MuxHandler instance.
"""
threading.Thread.__init__(self)
self._logger = util.get_class_logger(self)
self._mux_handler = mux_handler
self.setDaemon(True)
def run(self):
while True:
try:
physical_stream = self._mux_handler.physical_stream
message = physical_stream.receive_message()
if message is None:
break
# Below happens only when a data message is received.
opcode = physical_stream.get_last_received_opcode()
if opcode != common.OPCODE_BINARY:
self._mux_handler.fail_physical_connection(
_DROP_CODE_INVALID_ENCAPSULATING_MESSAGE,
'Received a text message on physical connection')
break
except ConnectionTerminatedException, e:
self._logger.debug('%s', e)
break
try:
self._mux_handler.dispatch_message(message)
except PhysicalConnectionError, e:
self._mux_handler.fail_physical_connection(
e.drop_code, e.message)
break
except LogicalChannelError, e:
self._mux_handler.fail_logical_channel(
e.channel_id, e.drop_code, e.message)
except Exception, e:
self._logger.debug(traceback.format_exc())
break
self._mux_handler.notify_reader_done()
class _Worker(threading.Thread):
"""A thread that is responsible for running the corresponding application
handler.
"""
def __init__(self, mux_handler, request):
"""Constructs an instance.
Args:
mux_handler: _MuxHandler instance.
request: _LogicalRequest instance.
"""
threading.Thread.__init__(self)
self._logger = util.get_class_logger(self)
self._mux_handler = mux_handler
self._request = request
self.setDaemon(True)
def run(self):
self._logger.debug('Logical channel worker started. (id=%d)' %
self._request.channel_id)
try:
# Non-critical exceptions will be handled by dispatcher.
self._mux_handler.dispatcher.transfer_data(self._request)
except LogicalChannelError, e:
self._mux_handler.fail_logical_channel(
e.channel_id, e.drop_code, e.message)
finally:
self._mux_handler.notify_worker_done(self._request.channel_id)
class _MuxHandshaker(hybi.Handshaker):
"""Opening handshake processor for multiplexing."""
_DUMMY_WEBSOCKET_KEY = 'dGhlIHNhbXBsZSBub25jZQ=='
def __init__(self, request, dispatcher, send_quota, receive_quota):
"""Constructs an instance.
Args:
request: _LogicalRequest instance.
dispatcher: Dispatcher instance (dispatch.Dispatcher).
send_quota: Initial send quota.
receive_quota: Initial receive quota.
"""
hybi.Handshaker.__init__(self, request, dispatcher)
self._send_quota = send_quota
self._receive_quota = receive_quota
# Append headers which should not be included in handshake field of
# AddChannelRequest.
# TODO(bashi): Make sure whether we should raise exception when
# these headers are included already.
request.headers_in[common.UPGRADE_HEADER] = (
common.WEBSOCKET_UPGRADE_TYPE)
request.headers_in[common.SEC_WEBSOCKET_VERSION_HEADER] = (
str(common.VERSION_HYBI_LATEST))
request.headers_in[common.SEC_WEBSOCKET_KEY_HEADER] = (
self._DUMMY_WEBSOCKET_KEY)
def _create_stream(self, stream_options):
"""Override hybi.Handshaker._create_stream."""
self._logger.debug('Creating logical stream for %d' %
self._request.channel_id)
return _LogicalStream(
self._request, stream_options, self._send_quota,
self._receive_quota)
def _create_handshake_response(self, accept):
"""Override hybi._create_handshake_response."""
response = []
response.append('HTTP/1.1 101 Switching Protocols\r\n')
# Upgrade and Sec-WebSocket-Accept should be excluded.
response.append('%s: %s\r\n' % (
common.CONNECTION_HEADER, common.UPGRADE_CONNECTION_TYPE))
if self._request.ws_protocol is not None:
response.append('%s: %s\r\n' % (
common.SEC_WEBSOCKET_PROTOCOL_HEADER,
self._request.ws_protocol))
if (self._request.ws_extensions is not None and
len(self._request.ws_extensions) != 0):
response.append('%s: %s\r\n' % (
common.SEC_WEBSOCKET_EXTENSIONS_HEADER,
common.format_extensions(self._request.ws_extensions)))
response.append('\r\n')
return ''.join(response)
def _send_handshake(self, accept):
"""Override hybi.Handshaker._send_handshake."""
# Don't send handshake response for the default channel
if self._request.channel_id == _DEFAULT_CHANNEL_ID:
return
handshake_response = self._create_handshake_response(accept)
frame_data = _create_add_channel_response(
self._request.channel_id,
handshake_response)
self._logger.debug('Sending handshake response for %d: %r' %
(self._request.channel_id, frame_data))
self._request.connection.write_control_data(frame_data)
class _LogicalChannelData(object):
"""A structure that holds information about logical channel.
"""
def __init__(self, request, worker):
self.request = request
self.worker = worker
self.drop_code = _DROP_CODE_NORMAL_CLOSURE
self.drop_message = ''
class _HandshakeDeltaBase(object):
"""A class that holds information for delta-encoded handshake."""
def __init__(self, headers):
self._headers = headers
def create_headers(self, delta=None):
"""Creates request headers for an AddChannelRequest that has
delta-encoded handshake.
Args:
delta: headers should be overridden.
"""
headers = copy.copy(self._headers)
if delta:
for key, value in delta.items():
# The spec requires that a header with an empty value is
# removed from the delta base.
if len(value) == 0 and headers.has_key(key):
del headers[key]
else:
headers[key] = value
return headers
class _MuxHandler(object):
"""Multiplexing handler. When a handler starts, it launches three
threads; the reader thread, the writer thread, and a worker thread.
The reader thread reads data from the physical stream, i.e., the
ws_stream object of the underlying websocket connection. The reader
thread interprets multiplexed frames and dispatches them to logical
channels. Methods of this class are mostly called by the reader thread.
The writer thread sends multiplexed frames which are created by
logical channels via the physical connection.
The worker thread launched at the starting point handles the
"Implicitly Opened Connection". If multiplexing handler receives
an AddChannelRequest and accepts it, the handler will launch a new worker
thread and dispatch the request to it.
"""
def __init__(self, request, dispatcher):
"""Constructs an instance.
Args:
request: mod_python request of the physical connection.
dispatcher: Dispatcher instance (dispatch.Dispatcher).
"""
self.original_request = request
self.dispatcher = dispatcher
self.physical_connection = request.connection
self.physical_stream = request.ws_stream
self._logger = util.get_class_logger(self)
self._logical_channels = {}
self._logical_channels_condition = threading.Condition()
# Holds client's initial quota
self._channel_slots = collections.deque()
self._handshake_base = None
self._worker_done_notify_received = False
self._reader = None
self._writer = None
def start(self):
"""Starts the handler.
Raises:
MuxUnexpectedException: when the handler already started, or when
opening handshake of the default channel fails.
"""
if self._reader or self._writer:
raise MuxUnexpectedException('MuxHandler already started')
self._reader = _PhysicalConnectionReader(self)
self._writer = _PhysicalConnectionWriter(self)
self._reader.start()
self._writer.start()
# Create "Implicitly Opened Connection".
logical_connection = _LogicalConnection(self, _DEFAULT_CHANNEL_ID)
headers = copy.copy(self.original_request.headers_in)
# Add extensions for logical channel.
headers[common.SEC_WEBSOCKET_EXTENSIONS_HEADER] = (
common.format_extensions(
self.original_request.mux_processor.extensions()))
self._handshake_base = _HandshakeDeltaBase(headers)
logical_request = _LogicalRequest(
_DEFAULT_CHANNEL_ID,
self.original_request.method,
self.original_request.uri,
self.original_request.protocol,
self._handshake_base.create_headers(),
logical_connection)
# Client's send quota for the implicitly opened connection is zero,
# but we will send FlowControl later so set the initial quota to
# _INITIAL_QUOTA_FOR_CLIENT.
self._channel_slots.append(_INITIAL_QUOTA_FOR_CLIENT)
send_quota = self.original_request.mux_processor.quota()
if not self._do_handshake_for_logical_request(
logical_request, send_quota=send_quota):
raise MuxUnexpectedException(
'Failed handshake on the default channel id')
self._add_logical_channel(logical_request)
# Send FlowControl for the implicitly opened connection.
frame_data = _create_flow_control(_DEFAULT_CHANNEL_ID,
_INITIAL_QUOTA_FOR_CLIENT)
logical_request.connection.write_control_data(frame_data)
def add_channel_slots(self, slots, send_quota):
"""Adds channel slots.
Args:
slots: number of slots to be added.
send_quota: initial send quota for slots.
"""
self._channel_slots.extend([send_quota] * slots)
# Send NewChannelSlot to client.
frame_data = _create_new_channel_slot(slots, send_quota)
self.send_control_data(frame_data)
def wait_until_done(self, timeout=None):
"""Waits until all workers are done. Returns False when timeout has
occurred. Returns True on success.
Args:
timeout: timeout in sec.
"""
self._logical_channels_condition.acquire()
try:
while len(self._logical_channels) > 0:
self._logger.debug('Waiting workers(%d)...' %
len(self._logical_channels))
self._worker_done_notify_received = False
self._logical_channels_condition.wait(timeout)
if not self._worker_done_notify_received:
self._logger.debug('Waiting worker(s) timed out')
return False
finally:
self._logical_channels_condition.release()
# Flush pending outgoing data
self._writer.stop()
self._writer.join()
return True
def notify_write_data_done(self, channel_id):
"""Called by the writer thread when a write operation has done.
Args:
channel_id: objective channel id.
"""
try:
self._logical_channels_condition.acquire()
if channel_id in self._logical_channels:
channel_data = self._logical_channels[channel_id]
channel_data.request.connection.on_write_data_done()
else:
self._logger.debug('Seems that logical channel for %d has gone'
% channel_id)
finally:
self._logical_channels_condition.release()
def send_control_data(self, data):
"""Sends data via the control channel.
Args:
data: data to be sent.
"""
self._writer.put_outgoing_data(_OutgoingData(
channel_id=_CONTROL_CHANNEL_ID, data=data))
def send_data(self, channel_id, data):
"""Sends data via given logical channel. This method is called by
worker threads.
Args:
channel_id: objective channel id.
data: data to be sent.
"""
self._writer.put_outgoing_data(_OutgoingData(
channel_id=channel_id, data=data))
def _send_drop_channel(self, channel_id, code=None, message=''):
frame_data = _create_drop_channel(channel_id, code, message)
self._logger.debug(
'Sending drop channel for channel id %d' % channel_id)
self.send_control_data(frame_data)
def _send_error_add_channel_response(self, channel_id, status=None):
if status is None:
status = common.HTTP_STATUS_BAD_REQUEST
if status in _HTTP_BAD_RESPONSE_MESSAGES:
message = _HTTP_BAD_RESPONSE_MESSAGES[status]
else:
self._logger.debug('Response message for %d is not found' % status)
message = '???'
response = 'HTTP/1.1 %d %s\r\n\r\n' % (status, message)
frame_data = _create_add_channel_response(channel_id,
encoded_handshake=response,
encoding=0, rejected=True)
self.send_control_data(frame_data)
def _create_logical_request(self, block):
if block.channel_id == _CONTROL_CHANNEL_ID:
# TODO(bashi): Raise PhysicalConnectionError with code 2006
# instead of MuxUnexpectedException.
raise MuxUnexpectedException(
'Received the control channel id (0) as objective channel '
'id for AddChannel')
if block.encoding > _HANDSHAKE_ENCODING_DELTA:
raise PhysicalConnectionError(
_DROP_CODE_UNKNOWN_REQUEST_ENCODING)
method, path, version, headers = _parse_request_text(
block.encoded_handshake)
if block.encoding == _HANDSHAKE_ENCODING_DELTA:
headers = self._handshake_base.create_headers(headers)
connection = _LogicalConnection(self, block.channel_id)
request = _LogicalRequest(block.channel_id, method, path, version,
headers, connection)
return request
def _do_handshake_for_logical_request(self, request, send_quota=0):
try:
receive_quota = self._channel_slots.popleft()
except IndexError:
raise LogicalChannelError(
request.channel_id, _DROP_CODE_NEW_CHANNEL_SLOT_VIOLATION)
handshaker = _MuxHandshaker(request, self.dispatcher,
send_quota, receive_quota)
try:
handshaker.do_handshake()
except handshake.VersionException, e:
self._logger.info('%s', e)
self._send_error_add_channel_response(
request.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
return False
except handshake.HandshakeException, e:
# TODO(bashi): Should we _Fail the Logical Channel_ with 3001
# instead?
self._logger.info('%s', e)
self._send_error_add_channel_response(request.channel_id,
status=e.status)
return False
except handshake.AbortedByUserException, e:
self._logger.info('%s', e)
self._send_error_add_channel_response(request.channel_id)
return False
return True
def _add_logical_channel(self, logical_request):
try:
self._logical_channels_condition.acquire()
if logical_request.channel_id in self._logical_channels:
self._logger.debug('Channel id %d already exists' %
logical_request.channel_id)
raise PhysicalConnectionError(
_DROP_CODE_CHANNEL_ALREADY_EXISTS,
'Channel id %d already exists' %
logical_request.channel_id)
worker = _Worker(self, logical_request)
channel_data = _LogicalChannelData(logical_request, worker)
self._logical_channels[logical_request.channel_id] = channel_data
worker.start()
finally:
self._logical_channels_condition.release()
def _process_add_channel_request(self, block):
try:
logical_request = self._create_logical_request(block)
except ValueError, e:
self._logger.debug('Failed to create logical request: %r' % e)
self._send_error_add_channel_response(
block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
return
if self._do_handshake_for_logical_request(logical_request):
if block.encoding == _HANDSHAKE_ENCODING_IDENTITY:
# Update handshake base.
# TODO(bashi): Make sure this is the right place to update
# handshake base.
self._handshake_base = _HandshakeDeltaBase(
logical_request.headers_in)
self._add_logical_channel(logical_request)
else:
self._send_error_add_channel_response(
block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
def _process_flow_control(self, block):
try:
self._logical_channels_condition.acquire()
if not block.channel_id in self._logical_channels:
return
channel_data = self._logical_channels[block.channel_id]
channel_data.request.ws_stream.replenish_send_quota(
block.send_quota)
finally:
self._logical_channels_condition.release()
def _process_drop_channel(self, block):
self._logger.debug(
'DropChannel received for %d: code=%r, reason=%r' %
(block.channel_id, block.drop_code, block.drop_message))
try:
self._logical_channels_condition.acquire()
if not block.channel_id in self._logical_channels:
return
channel_data = self._logical_channels[block.channel_id]
channel_data.drop_code = _DROP_CODE_ACKNOWLEDGED
# Close the logical channel
channel_data.request.connection.set_read_state(
_LogicalConnection.STATE_TERMINATED)
channel_data.request.ws_stream.stop_sending()
finally:
self._logical_channels_condition.release()
def _process_control_blocks(self, parser):
for control_block in parser.read_control_blocks():
opcode = control_block.opcode
self._logger.debug('control block received, opcode: %d' % opcode)
if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST:
self._process_add_channel_request(control_block)
elif opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
raise PhysicalConnectionError(
_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
'Received AddChannelResponse')
elif opcode == _MUX_OPCODE_FLOW_CONTROL:
self._process_flow_control(control_block)
elif opcode == _MUX_OPCODE_DROP_CHANNEL:
self._process_drop_channel(control_block)
elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
raise PhysicalConnectionError(
_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
'Received NewChannelSlot')
else:
raise MuxUnexpectedException(
'Unexpected opcode %r' % opcode)
def _process_logical_frame(self, channel_id, parser):
self._logger.debug('Received a frame. channel id=%d' % channel_id)
try:
self._logical_channels_condition.acquire()
if not channel_id in self._logical_channels:
# We must ignore the message for an inactive channel.
return
channel_data = self._logical_channels[channel_id]
fin, rsv1, rsv2, rsv3, opcode, payload = parser.read_inner_frame()
consuming_byte = len(payload)
if opcode != common.OPCODE_CONTINUATION:
consuming_byte += 1
if not channel_data.request.ws_stream.consume_receive_quota(
consuming_byte):
# The client violates quota. Close logical channel.
raise LogicalChannelError(
channel_id, _DROP_CODE_SEND_QUOTA_VIOLATION)
header = create_header(opcode, len(payload), fin, rsv1, rsv2, rsv3,
mask=False)
frame_data = header + payload
channel_data.request.connection.append_frame_data(frame_data)
finally:
self._logical_channels_condition.release()
def dispatch_message(self, message):
"""Dispatches message. The reader thread calls this method.
Args:
message: a message that contains encapsulated frame.
Raises:
PhysicalConnectionError: if the message contains physical
connection level errors.
LogicalChannelError: if the message contains logical channel
level errors.
"""
parser = _MuxFramePayloadParser(message)
try:
channel_id = parser.read_channel_id()
except ValueError, e:
raise PhysicalConnectionError(_DROP_CODE_CHANNEL_ID_TRUNCATED)
if channel_id == _CONTROL_CHANNEL_ID:
self._process_control_blocks(parser)
else:
self._process_logical_frame(channel_id, parser)
def notify_worker_done(self, channel_id):
"""Called when a worker has finished.
Args:
channel_id: channel id corresponded with the worker.
"""
self._logger.debug('Worker for channel id %d terminated' % channel_id)
try:
self._logical_channels_condition.acquire()
if not channel_id in self._logical_channels:
raise MuxUnexpectedException(
'Channel id %d not found' % channel_id)
channel_data = self._logical_channels.pop(channel_id)
finally:
self._worker_done_notify_received = True
self._logical_channels_condition.notify()
self._logical_channels_condition.release()
if not channel_data.request.server_terminated:
self._send_drop_channel(
channel_id, code=channel_data.drop_code,
message=channel_data.drop_message)
def notify_reader_done(self):
"""This method is called by the reader thread when the reader has
finished.
"""
self._logger.debug(
'Termiating all logical connections waiting for incoming data '
'...')
self._logical_channels_condition.acquire()
for channel_data in self._logical_channels.values():
try:
channel_data.request.connection.set_read_state(
_LogicalConnection.STATE_TERMINATED)
except Exception:
self._logger.debug(traceback.format_exc())
self._logical_channels_condition.release()
def notify_writer_done(self):
"""This method is called by the writer thread when the writer has
finished.
"""
self._logger.debug(
'Termiating all logical connections waiting for write '
'completion ...')
self._logical_channels_condition.acquire()
for channel_data in self._logical_channels.values():
try:
channel_data.request.connection.on_writer_done()
except Exception:
self._logger.debug(traceback.format_exc())
self._logical_channels_condition.release()
def fail_physical_connection(self, code, message):
"""Fail the physical connection.
Args:
code: drop reason code.
message: drop message.
"""
self._logger.debug('Failing the physical connection...')
self._send_drop_channel(_CONTROL_CHANNEL_ID, code, message)
self._writer.stop(common.STATUS_INTERNAL_ENDPOINT_ERROR)
def fail_logical_channel(self, channel_id, code, message):
"""Fail a logical channel.
Args:
channel_id: channel id.
code: drop reason code.
message: drop message.
"""
self._logger.debug('Failing logical channel %d...' % channel_id)
try:
self._logical_channels_condition.acquire()
if channel_id in self._logical_channels:
channel_data = self._logical_channels[channel_id]
# Close the logical channel. notify_worker_done() will be
# called later and it will send DropChannel.
channel_data.drop_code = code
channel_data.drop_message = message
channel_data.request.connection.set_read_state(
_LogicalConnection.STATE_TERMINATED)
channel_data.request.ws_stream.stop_sending()
else:
self._send_drop_channel(channel_id, code, message)
finally:
self._logical_channels_condition.release()
def use_mux(request):
return hasattr(request, 'mux_processor') and (
request.mux_processor.is_active())
def start(request, dispatcher):
mux_handler = _MuxHandler(request, dispatcher)
mux_handler.start()
mux_handler.add_channel_slots(_INITIAL_NUMBER_OF_CHANNEL_SLOTS,
_INITIAL_QUOTA_FOR_CLIENT)
mux_handler.wait_until_done()
# vi:sts=4 sw=4 et
|