/usr/bin/wsdump is in python-websocket 0.18.0-2.
This file is owned by root:root, with mode 0o755.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | #!/usr/bin/python
import argparse
import code
import six
import sys
import threading
import websocket
try:
import readline
except:
pass
OPCODE_DATA = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY)
ENCODING = getattr(sys.stdin, "encoding", "").lower()
class VAction(argparse.Action):
def __call__(self, parser, args, values, option_string=None):
if values==None:
values = "1"
try:
values = int(values)
except ValueError:
values = values.count("v")+1
setattr(args, self.dest, values)
def parse_args():
parser = argparse.ArgumentParser(description="WebSocket Simple Dump Tool")
parser.add_argument("url", metavar="ws_url",
help="websocket url. ex. ws://echo.websocket.org/")
parser.add_argument("-v", "--verbose", default=0, nargs='?', action=VAction,
dest="verbose",
help="set verbose mode. If set to 1, show opcode. "
"If set to 2, enable to trace websocket module")
return parser.parse_args()
class InteractiveConsole(code.InteractiveConsole):
def write(self, data):
sys.stdout.write("\033[2K\033[E")
# sys.stdout.write("\n")
sys.stdout.write("\033[34m" + data + "\033[39m")
sys.stdout.write("\n> ")
sys.stdout.flush()
def raw_input(self, prompt):
if six.PY3:
line = input(prompt)
else:
line = raw_input(prompt)
if ENCODING and ENCODING != "utf-8" and not isinstance(line, six.text_type):
line = line.decode(ENCODING).encode("utf-8")
elif isinstance(line, six.text_type):
line = line.encode("utf-8")
return line
def main():
args = parse_args()
console = InteractiveConsole()
if args.verbose > 1:
websocket.enableTrace(True)
ws = websocket.create_connection(args.url)
print("Press Ctrl+C to quit")
def recv():
frame = ws.recv_frame()
if not frame:
raise websocket.WebSocketException("Not a valid frame %s" % frame)
elif frame.opcode in OPCODE_DATA:
return (frame.opcode, frame.data)
elif frame.opcode == websocket.ABNF.OPCODE_CLOSE:
ws.send_close()
return (frame.opcode, None)
elif frame.opcode == websocket.ABNF.OPCODE_PING:
ws.pong("Hi!")
return frame.opcode, frame.data
return frame.opcode, frame.data
def recv_ws():
while True:
opcode, data = recv()
msg = None
if not args.verbose and opcode in OPCODE_DATA:
msg = "< %s" % data
elif args.verbose:
msg = "< %s: %s" % (websocket.ABNF.OPCODE_MAP.get(opcode), data)
if msg:
console.write(msg)
thread = threading.Thread(target=recv_ws)
thread.daemon = True
thread.start()
while True:
try:
message = console.raw_input("> ")
ws.send(message)
except KeyboardInterrupt:
return
except EOFError:
return
if __name__ == "__main__":
try:
main()
except Exception as e:
print(e)
|