/usr/share/pyshared/pyhsm/stick_client.py is in python-pyhsm 1.0.4f-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | """
module for talking to the YubiHSM over a socket.
"""
# Copyright (c) 2011 Yubico AB
# See the file COPYING for licence statement.
__all__ = [
# constants
# functions
# classes
'YHSM_Stick_Client',
]
import sys
import re
import socket
import pickle
import pyhsm.util
import pyhsm.exception
CMD_WRITE = 0
CMD_READ = 1
CMD_FLUSH = 2
CMD_DRAIN = 3
CMD_LOCK = 4
CMD_UNLOCK = 5
DEVICE_PATTERN = re.compile(r'daemon://(?P<host>[^:]+)(:(?P<port>\d+))?')
class YHSM_Stick_Client():
"""
The current YHSM is a USB device using serial communication.
This class exposes the basic functions read, write and flush (input).
"""
def __init__(self, device, timeout=1, debug=False):
"""
Open YHSM device.
"""
self.debug = debug
self.device = device
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
match = DEVICE_PATTERN.match(device)
self.socket.connect((match.group('host'), int(match.group('port'))))
self.socket_file = self.socket.makefile('wb')
self.num_read_bytes = 0
self.num_write_bytes = 0
if self.debug:
sys.stderr.write("%s: OPEN %s\n" % (
self.__class__.__name__,
self.socket
))
return None
def acquire(self):
pickle.dump((CMD_LOCK,), self.socket_file)
self.socket_file.flush()
return self.release
def release(self):
pickle.dump((CMD_UNLOCK,), self.socket_file)
self.socket_file.flush()
def write(self, data, debug_info=None):
"""
Write data to YHSM device.
"""
self.num_write_bytes += len(data)
if self.debug:
if not debug_info:
debug_info = str(len(data))
sys.stderr.write("%s: WRITE %s:\n%s\n" % (
self.__class__.__name__,
debug_info,
pyhsm.util.hexdump(data)
))
pickle.dump((CMD_WRITE, data), self.socket_file)
self.socket_file.flush()
return pickle.load(self.socket_file)
def read(self, num_bytes, debug_info=None):
"""
Read a number of bytes from YubiHSM device.
"""
if self.debug:
if not debug_info:
debug_info = str(num_bytes)
sys.stderr.write("%s: READING %s\n" % (
self.__class__.__name__,
debug_info
))
pickle.dump((CMD_READ, num_bytes), self.socket_file)
self.socket_file.flush()
res = pickle.load(self.socket_file)
if self.debug:
sys.stderr.write("%s: READ %i:\n%s\n" % (
self.__class__.__name__,
len(res),
pyhsm.util.hexdump(res)
))
self.num_read_bytes += len(res)
return res
def flush(self):
"""
Flush input buffers.
"""
pickle.dump((CMD_FLUSH,), self.socket_file)
self.socket_file.flush()
return pickle.load(self.socket_file)
def drain(self):
""" Drain input. """
pickle.dump((CMD_DRAIN,), self.socket_file)
self.socket_file.flush()
return pickle.load(self.socket_file)
def raw_device(self):
""" Get the socket address. Only intended for test code/debugging! """
return self.device
def set_debug(self, new):
"""
Set debug mode (boolean).
Returns old setting.
"""
if type(new) is not bool:
raise pyhsm.exception.YHSM_WrongInputType(
'new', bool, type(new))
old = self.debug
self.debug = new
return old
def __repr__(self):
return '<%s instance at %s: %s - r:%i w:%i>' % (
self.__class__.__name__,
hex(id(self)),
self.device,
self.num_read_bytes,
self.num_write_bytes
)
def __del__(self):
"""
Close device when YHSM instance is destroyed.
"""
if self.debug:
sys.stderr.write("%s: CLOSE %s\n" % (
self.__class__.__name__,
self.device
))
try:
self.socket_file.close()
self.socket.close()
except Exception:
pass
|