/usr/lib/python2.7/dist-packages/ldif3.py is in python-ldif3 3.2.1-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 | """ldif3 - generate and parse LDIF data (see RFC 2849)."""
from __future__ import unicode_literals
import base64
import re
import logging
from collections import OrderedDict
try: # pragma: nocover
from urlparse import urlparse
from urllib import urlopen
except ImportError: # pragma: nocover
from urllib.parse import urlparse
from urllib.request import urlopen
__version__ = '3.2.1'
__all__ = [
# constants
'LDIF_PATTERN',
# classes
'LDIFWriter',
'LDIFParser',
]
log = logging.getLogger('ldif3')
ATTRTYPE_PATTERN = r'[\w;.-]+(;[\w_-]+)*'
ATTRVALUE_PATTERN = r'(([^,]|\\,)+|".*?")'
ATTR_PATTERN = ATTRTYPE_PATTERN + r'[ ]*=[ ]*' + ATTRVALUE_PATTERN
RDN_PATTERN = ATTR_PATTERN + r'([ ]*\+[ ]*' + ATTR_PATTERN + r')*[ ]*'
DN_PATTERN = RDN_PATTERN + r'([ ]*,[ ]*' + RDN_PATTERN + r')*[ ]*'
DN_REGEX = re.compile('^%s$' % DN_PATTERN)
LDIF_PATTERN = ('^((dn(:|::) %(DN_PATTERN)s)|(%(ATTRTYPE_PATTERN)'
's(:|::) .*)$)+' % vars())
MOD_OPS = ['add', 'delete', 'replace']
CHANGE_TYPES = ['add', 'delete', 'modify', 'modrdn']
def is_dn(s):
"""Return True if s is a LDAP DN."""
if s == '':
return True
rm = DN_REGEX.match(s)
return rm is not None and rm.group(0) == s
UNSAFE_STRING_PATTERN = '(^[ :<]|[\000\n\r\200-\377])'
UNSAFE_STRING_RE = re.compile(UNSAFE_STRING_PATTERN)
def lower(l):
"""Return a list with the lowercased items of l."""
return [i.lower() for i in l or []]
class LDIFWriter(object):
"""Write LDIF entry or change records to file object.
:type output_file: file-like object in binary mode
:param output_file: File for output
:type base64_attrs: List[string]
:param base64_attrs: List of attribute types to be base64-encoded in any
case
:type cols: int
:param cols: Specifies how many columns a line may have before it is
folded into many lines
:type line_sep: bytearray
:param line_sep: line separator
:type encoding: string
:param encoding: Encoding to use for converting values to bytes. Note that
the spec requires the dn field to be UTF-8 encoded, so it does not
really make sense to use anything else. Default: ``'utf8'``.
"""
def __init__(
self,
output_file,
base64_attrs=[],
cols=76,
line_sep=b'\n',
encoding='utf8'):
self._output_file = output_file
self._base64_attrs = lower(base64_attrs)
self._cols = cols
self._line_sep = line_sep
self._encoding = encoding
self.records_written = 0 #: number of records that have been written
def _fold_line(self, line):
"""Write string line as one or more folded lines."""
if len(line) <= self._cols:
self._output_file.write(line)
self._output_file.write(self._line_sep)
else:
pos = self._cols
self._output_file.write(line[0:self._cols])
self._output_file.write(self._line_sep)
while pos < len(line):
self._output_file.write(b' ')
end = min(len(line), pos + self._cols - 1)
self._output_file.write(line[pos:end])
self._output_file.write(self._line_sep)
pos = end
def _needs_base64_encoding(self, attr_type, attr_value):
"""Return True if attr_value has to be base-64 encoded.
This is the case because of special chars or because attr_type is in
self._base64_attrs
"""
return attr_type.lower() in self._base64_attrs or \
isinstance(attr_value, bytes) or \
UNSAFE_STRING_RE.search(attr_value) is not None
def _unparse_attr(self, attr_type, attr_value):
"""Write a single attribute type/value pair."""
if self._needs_base64_encoding(attr_type, attr_value):
if not isinstance(attr_value, bytes):
attr_value = attr_value.encode(self._encoding)
encoded = base64.encodestring(attr_value)\
.replace(b'\n', b'')\
.decode('ascii')
line = ':: '.join([attr_type, encoded])
else:
line = ': '.join([attr_type, attr_value])
self._fold_line(line.encode('ascii'))
def _unparse_entry_record(self, entry):
"""
:type entry: Dict[string, List[string]]
:param entry: Dictionary holding an entry
"""
for attr_type in sorted(entry.keys()):
for attr_value in entry[attr_type]:
self._unparse_attr(attr_type, attr_value)
def _unparse_changetype(self, mod_len):
"""Detect and write the changetype."""
if mod_len == 2:
changetype = 'add'
elif mod_len == 3:
changetype = 'modify'
else:
raise ValueError("modlist item of wrong length")
self._unparse_attr('changetype', changetype)
def _unparse_change_record(self, modlist):
"""
:type modlist: List[Tuple]
:param modlist: List of additions (2-tuple) or modifications (3-tuple)
"""
mod_len = len(modlist[0])
self._unparse_changetype(mod_len)
for mod in modlist:
if len(mod) != mod_len:
raise ValueError("Subsequent modlist item of wrong length")
if mod_len == 2:
mod_type, mod_vals = mod
elif mod_len == 3:
mod_op, mod_type, mod_vals = mod
self._unparse_attr(MOD_OPS[mod_op], mod_type)
for mod_val in mod_vals:
self._unparse_attr(mod_type, mod_val)
if mod_len == 3:
self._output_file.write(b'-' + self._line_sep)
def unparse(self, dn, record):
"""Write an entry or change record to the output file.
:type dn: string
:param dn: distinguished name
:type record: Union[Dict[string, List[string]], List[Tuple]]
:param record: Either a dictionary holding an entry or a list of
additions (2-tuple) or modifications (3-tuple).
"""
self._unparse_attr('dn', dn)
if isinstance(record, dict):
self._unparse_entry_record(record)
elif isinstance(record, list):
self._unparse_change_record(record)
else:
raise ValueError("Argument record must be dictionary or list")
self._output_file.write(self._line_sep)
self.records_written += 1
class LDIFParser(object):
"""Read LDIF entry or change records from file object.
:type input_file: file-like object in binary mode
:param input_file: file to read the LDIF input from
:type ignored_attr_types: List[string]
:param ignored_attr_types: List of attribute types that will be ignored
:type process_url_schemes: List[bytearray]
:param process_url_schemes: List of URL schemes to process with urllib.
An empty list turns off all URL processing and the attribute is
ignored completely.
:type line_sep: bytearray
:param line_sep: line separator
:type encoding: string
:param encoding: Encoding to use for converting values to unicode strings.
If decoding failes, the raw bytestring will be used instead. You can
also pass ``None`` which will skip decoding and always produce
bytestrings. Note that this only applies to entry values. ``dn`` and
entry keys will always be unicode strings.
:type strict: boolean
:param strict: If set to ``False``, recoverable parse errors will produce
log warnings rather than exceptions.
"""
def _strip_line_sep(self, s):
"""Strip trailing line separators from s, but no other whitespaces."""
if s[-2:] == b'\r\n':
return s[:-2]
elif s[-1:] == b'\n':
return s[:-1]
else:
return s
def __init__(
self,
input_file,
ignored_attr_types=[],
process_url_schemes=[],
line_sep=b'\n',
encoding='utf8',
strict=True):
self._input_file = input_file
self._process_url_schemes = lower(process_url_schemes)
self._ignored_attr_types = lower(ignored_attr_types)
self._line_sep = line_sep
self._encoding = encoding
self._strict = strict
self.line_counter = 0 #: number of lines that have been read
self.byte_counter = 0 #: number of bytes that have been read
self.records_read = 0 #: number of records that have been read
def _iter_unfolded_lines(self):
"""Iter input unfoled lines. Skip comments."""
line = self._input_file.readline()
while line:
self.line_counter += 1
self.byte_counter += len(line)
line = self._strip_line_sep(line)
nextline = self._input_file.readline()
while nextline and nextline[:1] == b' ':
line += self._strip_line_sep(nextline)[1:]
nextline = self._input_file.readline()
if not line.startswith(b'#'):
yield line
line = nextline
def _iter_blocks(self):
"""Iter input lines in blocks separated by blank lines."""
lines = []
for line in self._iter_unfolded_lines():
if line:
lines.append(line)
elif lines:
self.records_read += 1
yield lines
lines = []
if lines:
self.records_read += 1
yield lines
def _decode_value(self, attr_type, attr_value):
if attr_type == u'dn':
try:
return attr_type, attr_value.decode('utf8')
except UnicodeError as err:
self._error(err)
return attr_type, attr_value.decode('utf8', 'ignore')
elif self._encoding is not None:
try:
return attr_type, attr_value.decode(self._encoding)
except UnicodeError:
pass
return attr_type, attr_value
def _parse_attr(self, line):
"""Parse a single attribute type/value pair."""
colon_pos = line.index(b':')
attr_type = line[0:colon_pos].decode('ascii')
if line[colon_pos:].startswith(b'::'):
attr_value = base64.decodestring(line[colon_pos + 2:])
elif line[colon_pos:].startswith(b':<'):
url = line[colon_pos + 2:].strip()
attr_value = b''
if self._process_url_schemes:
u = urlparse(url)
if u[0] in self._process_url_schemes:
attr_value = urlopen(url.decode('ascii')).read()
else:
attr_value = line[colon_pos + 1:].strip()
return self._decode_value(attr_type, attr_value)
def _error(self, msg):
if self._strict:
raise ValueError(msg)
else:
log.warning(msg)
def _check_dn(self, dn, attr_value):
"""Check dn attribute for issues."""
if dn is not None:
self._error('Two lines starting with dn: in one record.')
if not is_dn(attr_value):
self._error('No valid string-representation of '
'distinguished name %s.' % attr_value)
def _check_changetype(self, dn, changetype, attr_value):
"""Check changetype attribute for issues."""
if dn is None:
self._error('Read changetype: before getting valid dn: line.')
if changetype is not None:
self._error('Two lines starting with changetype: in one record.')
if attr_value not in CHANGE_TYPES:
self._error('changetype value %s is invalid.' % attr_value)
def _parse_entry_record(self, lines):
"""Parse a single entry record from a list of lines."""
dn = None
entry = OrderedDict()
for line in lines:
attr_type, attr_value = self._parse_attr(line)
if attr_type == 'dn':
self._check_dn(dn, attr_value)
dn = attr_value
elif attr_type == 'version' and dn is None:
pass # version = 1
else:
if dn is None:
self._error('First line of record does not start '
'with "dn:": %s' % attr_type)
if attr_value is not None and \
attr_type.lower() not in self._ignored_attr_types:
if attr_type in entry:
entry[attr_type].append(attr_value)
else:
entry[attr_type] = [attr_value]
return dn, entry
def parse(self):
"""Iterate LDIF entry records.
:rtype: Iterator[Tuple[string, Dict]]
:return: (dn, entry)
"""
for block in self._iter_blocks():
yield self._parse_entry_record(block)
|