/usr/lib/python3/dist-packages/provisioningserver/monkey.py is in python3-maas-provisioningserver 2.4.0~beta2-6865-gec43e47e6-0ubuntu1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 | # Copyright 2015-2016 Canonical Ltd. This software is licensed under the
# GNU Affero General Public License version 3 (see the file LICENSE).
"""
Monkey patch for the MAAS provisioning server, with code for rack and region
server patching.
"""
__all__ = [
"add_patches_to_txtftp",
"add_patches_to_twisted",
]
def add_term_error_code_to_tftp():
"""Add error code 8 to TFT server as introduced by RFC 2347.
Manually apply the fix to python-tx-tftp landed in
https://github.com/shylent/python-tx-tftp/pull/20
"""
import tftp.datagram
if tftp.datagram.errors.get(8) is None:
tftp.datagram.errors[8] = (
"Terminate transfer due to option negotiation")
def fix_tftp_requests():
"""Use intelligence in determining IPv4 vs IPv6 when creatinging a session.
Specifically, look at addr[0] and pass iface to listenUDP based on that.
See https://bugs.launchpad.net/ubuntu/+source/python-tx-tftp/1614581
"""
import tftp.protocol
from tftp.datagram import (
OP_WRQ,
ERRORDatagram,
ERR_NOT_DEFINED,
ERR_ACCESS_VIOLATION,
ERR_FILE_EXISTS,
ERR_ILLEGAL_OP,
OP_RRQ,
ERR_FILE_NOT_FOUND
)
from tftp.bootstrap import (
RemoteOriginWriteSession,
RemoteOriginReadSession,
)
from tftp.netascii import NetasciiReceiverProxy, NetasciiSenderProxy
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.python.context import call
from tftp.errors import (
FileExists,
Unsupported,
AccessViolation,
BackendError,
FileNotFound,
)
from netaddr import IPAddress
@inlineCallbacks
def new_startSession(self, datagram, addr, mode):
# Set up a call context so that we can pass extra arbitrary
# information to interested backends without adding extra call
# arguments, or switching to using a request object, for example.
context = {}
if self.transport is not None:
# Add the local and remote addresses to the call context.
local = self.transport.getHost()
context["local"] = local.host, local.port
context["remote"] = addr
try:
if datagram.opcode == OP_WRQ:
fs_interface = yield call(
context, self.backend.get_writer, datagram.filename)
elif datagram.opcode == OP_RRQ:
fs_interface = yield call(
context, self.backend.get_reader, datagram.filename)
except Unsupported as e:
self.transport.write(ERRORDatagram.from_code(
ERR_ILLEGAL_OP,
u"{}".format(e).encode("ascii", "replace")).to_wire(), addr)
except AccessViolation:
self.transport.write(
ERRORDatagram.from_code(ERR_ACCESS_VIOLATION).to_wire(), addr)
except FileExists:
self.transport.write(
ERRORDatagram.from_code(ERR_FILE_EXISTS).to_wire(), addr)
except FileNotFound:
self.transport.write(
ERRORDatagram.from_code(ERR_FILE_NOT_FOUND).to_wire(), addr)
except BackendError as e:
self.transport.write(ERRORDatagram.from_code(
ERR_NOT_DEFINED,
u"{}".format(e).encode("ascii", "replace")).to_wire(), addr)
else:
if IPAddress(addr[0]).version == 6:
iface = '::'
else:
iface = ''
if datagram.opcode == OP_WRQ:
if mode == b'netascii':
fs_interface = NetasciiReceiverProxy(fs_interface)
session = RemoteOriginWriteSession(
addr, fs_interface, datagram.options, _clock=self._clock)
reactor.listenUDP(0, session, iface)
returnValue(session)
elif datagram.opcode == OP_RRQ:
if mode == b'netascii':
fs_interface = NetasciiSenderProxy(fs_interface)
session = RemoteOriginReadSession(
addr, fs_interface, datagram.options, _clock=self._clock)
reactor.listenUDP(0, session, iface)
returnValue(session)
tftp.protocol.TFTP._startSession = new_startSession
def get_patched_URI():
"""Create the patched `twisted.web.client.URI` to handle IPv6."""
import re
from twisted.web import http
from twisted.web.client import URI
class PatchedURI(URI):
@classmethod
def fromBytes(cls, uri, defaultPort=None):
"""Patched replacement for `twisted.web.client._URI.fromBytes`.
The Twisted version of this function breaks when you give it a URL
whose netloc is based on an IPv6 address.
"""
uri = uri.strip()
scheme, netloc, path, params, query, fragment = http.urlparse(uri)
if defaultPort is None:
scheme_ports = {b'https': 443, b'http': 80}
defaultPort = scheme_ports.get(scheme, 80)
if b'[' in netloc:
# IPv6 address. This is complicated.
parsed_netloc = re.match(
b'\\[(?P<host>[0-9A-Fa-f:.]+)\\]([:](?P<port>[0-9]+))?$',
netloc)
host, port = parsed_netloc.group('host', 'port')
elif b':' in netloc:
# IPv4 address or hostname, with port spec. This is easy.
host, port = netloc.split(b':')
else:
# IPv4 address or hostname, without port spec.
# This is trivial.
host = netloc
port = None
if port is None:
port = defaultPort
try:
port = int(port)
except ValueError:
port = defaultPort
return cls(
scheme, netloc, host, port, path, params, query, fragment)
return PatchedURI
def fix_twisted_web_client_URI():
"""Patch the `twisted.web.client.URI` to handle IPv6."""
import twisted.web.client
PatchedURI = get_patched_URI()
if hasattr(twisted.web.client, "_URI"):
twisted.web.client._URI = PatchedURI
else:
twisted.web.client.URI = PatchedURI
def fix_twisted_web_http_Request():
"""Add ipv6 support to Request.getClientIP()
Specifically, IPv6 IP addresses need to be wrapped in [], and return
address.IPv6Address when needed.
See https://bugs.launchpad.net/ubuntu/+source/twisted/+bug/1604608
"""
from netaddr import IPAddress
from netaddr.core import AddrFormatError
from twisted.internet import address
from twisted.python.compat import (
intToBytes,
networkString,
)
import twisted.web.http
from twisted.web.server import Request
from twisted.web.test.requesthelper import DummyChannel
def new_getClientIP(self):
from twisted.internet import address
# upstream doesn't check for address.IPv6Address
if isinstance(self.client, address.IPv4Address):
return self.client.host
elif isinstance(self.client, address.IPv6Address):
return self.client.host
else:
return None
def new_getRequestHostname(self):
# Unlike upstream, support/require IPv6 addresses to be
# [ip:v6:add:ress]:port, with :port being optional.
# IPv6 IP addresses are wrapped in [], to disambigate port numbers.
host = self.getHeader(b'host')
if host:
if host.startswith(b'[') and b']' in host:
if host.find(b']') < host.rfind(b':'):
# The format is: [ip:add:ress]:port.
return host[:host.rfind(b':')]
else:
# no :port after [...]
return host
# No brackets, so it must be host:port or IPv4:port.
return host.split(b':', 1)[0]
host = self.getHost().host
try:
if isinstance(host, str):
ip = IPAddress(host)
else:
ip = IPAddress(host.decode("idna"))
except AddrFormatError:
# If we could not convert the hostname to an IPAddress, assume that
# it is a hostname.
return networkString(host)
if ip.version == 4:
return networkString(host)
else:
return networkString('[' + host + ']')
def new_setHost(self, host, port, ssl=0):
try:
ip = IPAddress(host.decode("idna"))
except AddrFormatError:
ip = None # `host` is a host or domain name.
self._forceSSL = ssl # set first so isSecure will work
if self.isSecure():
default = 443
else:
default = 80
if ip is None:
hostHeader = host
elif ip.version == 4:
hostHeader = host
else:
hostHeader = b"[" + host + b"]"
if port != default:
hostHeader += b":" + intToBytes(port)
self.requestHeaders.setRawHeaders(b"host", [hostHeader])
if ip is None:
# Pretend that a host or domain name is an IPv4 address.
self.host = address.IPv4Address("TCP", host, port)
elif ip.version == 4:
self.host = address.IPv4Address("TCP", host, port)
else:
self.host = address.IPv6Address("TCP", host, port)
request = Request(DummyChannel(), False)
request.client = address.IPv6Address('TCP', 'fe80::1', '80')
request.setHost(b"fe80::1", 1234)
if request.getClientIP() is None:
# Buggy code returns None for IPv6 addresses.
twisted.web.http.Request.getClientIP = new_getClientIP
if isinstance(request.host, address.IPv4Address):
# Buggy code calls fe80::1 an IPv4Address.
twisted.web.http.Request.setHost = new_setHost
if request.getRequestHostname() == b'fe80':
# The fe80::1 test address above was incorrectly interpreted as
# address='fe80', port = ':1', because it does host.split(':', 1)[0].
twisted.web.http.Request.getRequestHostname = new_getRequestHostname
def fix_twisted_web_server_addressToTuple():
"""Add ipv6 support to t.w.server._addressToTuple()
Return address.IPv6Address where appropriate.
See https://bugs.launchpad.net/ubuntu/+source/twisted/+bug/1604608
"""
import twisted.web.server
from twisted.internet import address
def new_addressToTuple(addr):
if isinstance(addr, address.IPv4Address):
return ('INET', addr.host, addr.port)
elif isinstance(addr, address.IPv6Address):
return ('INET6', addr.host, addr.port)
elif isinstance(addr, address.UNIXAddress):
return ('UNIX', addr.name)
else:
return tuple(addr)
test = address.IPv6Address("TCP", "fe80::1", '80')
try:
twisted.web.server._addressToTuple(test)
except TypeError:
twisted.web.server._addressToTuple = new_addressToTuple
def fix_twisted_internet_tcp():
"""Default client to AF_INET6 sockets.
Specifically, strip any brackets surrounding the address.
See https://bugs.launchpad.net/ubuntu/+source/twisted/+bug/1604608
"""
import socket
import twisted.internet.tcp
from twisted.internet.tcp import _NUMERIC_ONLY
def new_resolveIPv6(ip, port):
# Remove brackets surrounding the address, if any.
ip = ip.strip('[]')
return socket.getaddrinfo(ip, port, 0, 0, 0, _NUMERIC_ONLY)[0][4]
twisted.internet.tcp._resolveIPv6 = new_resolveIPv6
def augment_twisted_deferToThreadPool():
"""Wrap every function deferred to a thread in `synchronous`."""
from twisted.internet import threads
from twisted.internet.threads import deferToThreadPool
from provisioningserver.utils.twisted import ISynchronous, synchronous
def new_deferToThreadPool(reactor, threadpool, f, *args, **kwargs):
"""Variant of Twisted's that wraps all functions in `synchronous`."""
func = f if ISynchronous.providedBy(f) else synchronous(f)
return deferToThreadPool(reactor, threadpool, func, *args, **kwargs)
if threads.deferToThreadPool.__module__ != __name__:
threads.deferToThreadPool = new_deferToThreadPool
def add_patches_to_txtftp():
add_term_error_code_to_tftp()
fix_tftp_requests()
def add_patches_to_twisted():
fix_twisted_web_client_URI()
fix_twisted_web_http_Request()
fix_twisted_web_server_addressToTuple()
fix_twisted_internet_tcp()
augment_twisted_deferToThreadPool()
|