/usr/share/pyshared/nxt/system.py is in python-nxt 2.2.2-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 | # nxt.system module -- LEGO Mindstorms NXT system telegrams
# Copyright (C) 2006 Douglas P Lau
# Copyright (C) 2009 Marcus Wanner
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
'Use for communications regarding the NXT filesystem and such ***ADVANCED USERS ONLY***'
def _create(opcode):
'Create a simple system telegram'
from telegram import Telegram
return Telegram(False, opcode)
def _create_with_file(opcode, fname):
tgram = _create(opcode)
tgram.add_filename(fname)
return tgram
def _create_with_handle(opcode, handle):
tgram = _create(opcode)
tgram.add_u8(handle)
return tgram
def open_read(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_open_read(tgram):
tgram.check_status()
handle = tgram.parse_u8()
n_bytes = tgram.parse_u32()
return (handle, n_bytes)
def open_write(opcode, fname, n_bytes):
tgram = _create_with_file(opcode, fname)
tgram.add_u32(n_bytes)
return tgram
def _parse_open_write(tgram):
tgram.check_status()
handle = tgram.parse_u8()
return handle
def read(opcode, handle, n_bytes):
tgram = _create_with_handle(opcode, handle)
tgram.add_u16(n_bytes)
return tgram
def _parse_read(tgram):
tgram.check_status()
handle = tgram.parse_u8()
n_bytes = tgram.parse_u16()
data = tgram.parse_string()
return (handle, n_bytes, data)
def write(opcode, handle, data):
tgram = _create_with_handle(opcode, handle)
tgram.add_string(len(data), data)
return tgram
def _parse_write(tgram):
tgram.check_status()
handle = tgram.parse_u8()
n_bytes = tgram.parse_u16()
return (handle, n_bytes)
def close(opcode, handle):
return _create_with_handle(opcode, handle)
def _parse_close(tgram):
tgram.check_status()
handle = tgram.parse_u8()
return handle
def delete(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_delete(tgram):
tgram.check_status()
handle = tgram.parse_u8()
fname = tgram.parse_string()
return (handle, fname)
def find_first(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_find(tgram):
tgram.check_status()
handle = tgram.parse_u8()
fname = tgram.parse_string(20)
n_bytes = tgram.parse_u32()
return (handle, fname, n_bytes)
def find_next(opcode, handle):
return _create_with_handle(opcode, handle)
def get_firmware_version(opcode):
return _create(opcode)
def _parse_get_firmware_version(tgram):
tgram.check_status()
prot_minor = tgram.parse_u8()
prot_major = tgram.parse_u8()
prot_version = (prot_major, prot_minor)
fw_minor = tgram.parse_u8()
fw_major = tgram.parse_u8()
fw_version = (fw_major, fw_minor)
return (prot_version, fw_version)
def open_write_linear(opcode, fname, n_bytes):
tgram = _create_with_file(opcode, fname)
tgram.add_u32(n_bytes)
return tgram
def open_read_linear(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_open_read_linear(tgram):
tgram.check_status()
n_bytes = tgram.parse_u32()
return n_bytes
def open_write_data(opcode, fname, n_bytes):
tgram = _create_with_file(opcode, fname)
tgram.add_u32(n_bytes)
return tgram
def open_append_data(opcode, fname):
return _create_with_file(opcode, fname)
def _parse_open_append_data(tgram):
tgram.check_status()
handle = tgram.parse_u8()
n_bytes = tgram.parse_u32()
return (handle, n_bytes)
def request_first_module(opcode, mname):
return _create_with_file(opcode, mname)
def _parse_request_module(tgram):
tgram.check_status()
handle = tgram.parse_u8()
mname = tgram.parse_string(20)
mod_id = tgram.parse_u32()
mod_size = tgram.parse_u32()
mod_iomap_size = tgram.parse_u16()
return (handle, mname, mod_id, mod_size, mod_iomap_size)
def request_next_module(opcode, handle):
return _create_with_handle(opcode, handle)
def close_module_handle(opcode, handle):
return _create_with_handle(opcode, handle)
def read_io_map(opcode, mod_id, offset, n_bytes):
tgram = _create(opcode)
tgram.add_u32(mod_id)
tgram.add_u16(offset)
tgram.add_u16(n_bytes)
return tgram
def _parse_read_io_map(tgram):
tgram.check_status()
mod_id = tgram.parse_u32()
n_bytes = tgram.parse_u16()
contents = tgram.parse_string()
return (mod_id, n_bytes, contents)
def write_io_map(opcode, mod_id, offset, content):
tgram = _create(opcode)
tgram.add_u32(mod_id)
tgram.add_u16(offset)
tgram.add_u16(len(content))
tgram.add_string(len(content), content)
return tgram
def _parse_write_io_map(tgram):
tgram.check_status()
mod_id = tgram.parse_u32()
n_bytes = tgram.parse_u16()
return (mod_id, n_bytes)
def boot(opcode):
# Note: this command is USB only (no Bluetooth)
tgram = _create(opcode)
tgram.add_string(19, "Let's dance: SAMBA\0")
return tgram
def _parse_boot(tgram):
tgram.check_status()
resp = tgram.parse_string()
# Resp should be 'Yes\0'
return resp
def set_brick_name(opcode, bname):
tgram = _create(opcode)
if len(bname) > 15:
print "Warning! Brick name %s will be truncated to %s!" % (bname, bname[0:15])
bname = bname[0:15]
elif len(bname) < 15:
bname += '\x00' * (15-len(bname)) #fill the extra chars with nulls
tgram.add_string(len(bname), bname)
return tgram
def _parse_set_brick_name(tgram):
tgram.check_status()
def get_device_info(opcode):
return _create(opcode)
def _parse_get_device_info(tgram):
tgram.check_status()
name = tgram.parse_string(15)
a0 = tgram.parse_u8()
a1 = tgram.parse_u8()
a2 = tgram.parse_u8()
a3 = tgram.parse_u8()
a4 = tgram.parse_u8()
a5 = tgram.parse_u8()
a6 = tgram.parse_u8()
# FIXME: what is a6 for?
address = '%02X:%02X:%02X:%02X:%02X:%02X' % (a0, a1, a2, a3, a4, a5)
signal_strength = tgram.parse_u32()
user_flash = tgram.parse_u32()
return (name, address, signal_strength, user_flash)
def delete_user_flash(opcode):
return _create(opcode)
def _parse_delete_user_flash(tgram):
tgram.check_status()
def poll_command_length(opcode, buf_num):
tgram = _create(opcode)
tgram.add_u8(buf_num)
return tgram
def _parse_poll_command_length(tgram):
buf_num = tgram.parse_u8()
tgram.check_status()
n_bytes = tgram.parse_u8()
return (buf_num, n_bytes)
def poll_command(opcode, buf_num, n_bytes):
tgram = _create(opcode)
tgram.add_u8(buf_num)
tgram.add_u8(n_bytes)
return tgram
def _parse_poll_command(tgram):
buf_num = tgram.parse_u8()
tgram.check_status()
n_bytes = tgram.parse_u8()
command = tgram.parse_string()
return (buf_num, n_bytes, command)
def bluetooth_factory_reset(opcode):
# Note: this command is USB only (no Bluetooth)
return _create(opcode)
def _parse_bluetooth_factory_reset(tgram):
tgram.check_status()
#TODO Add docstrings to all methods
OPCODES = {
0x80: (open_read, _parse_open_read),
0x81: (open_write, _parse_open_write),
0x82: (read, _parse_read),
0x83: (write, _parse_write),
0x84: (close, _parse_close),
0x85: (delete, _parse_delete),
0x86: (find_first, _parse_find),
0x87: (find_next, _parse_find),
0x88: (get_firmware_version, _parse_get_firmware_version),
0x89: (open_write_linear, _parse_open_write),
0x8A: (open_read_linear, _parse_open_read_linear),
0x8B: (open_write_data, _parse_open_write),
0x8C: (open_append_data, _parse_open_append_data),
0x90: (request_first_module, _parse_request_module),
0x91: (request_next_module, _parse_request_module),
0x92: (close_module_handle, _parse_close),
0x94: (read_io_map, _parse_read_io_map),
0x95: (write_io_map, _parse_write_io_map),
0x97: (boot, _parse_boot),
0x98: (set_brick_name, _parse_set_brick_name),
0x9B: (get_device_info, _parse_get_device_info),
0xA0: (delete_user_flash, _parse_delete_user_flash),
0xA1: (poll_command_length, _parse_poll_command_length),
0xA2: (poll_command, _parse_poll_command),
0xA4: (bluetooth_factory_reset, _parse_bluetooth_factory_reset),
}
|