/usr/share/pyshared/cogent/util/misc.py is in python-cogent 1.5.3-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 | #!/usr/bin/env python
"""Generally useful utility classes and methods.
"""
import types
import sys
from time import clock
from datetime import datetime
from string import maketrans, strip
from random import randrange, choice, randint
from sys import maxint
from os import popen, remove, makedirs, getenv
from os.path import join, abspath, exists, isdir
from numpy import logical_not, sum
from cPickle import dumps, loads
from gzip import GzipFile
import hashlib
# import parse_command_line_parameters for backward compatibility
from cogent.util.option_parsing import parse_command_line_parameters
__author__ = "Rob Knight"
__copyright__ = "Copyright 2007-2012, The Cogent Project"
__credits__ = ["Rob Knight", "Peter Maxwell", "Amanda Birmingham",
"Sandra Smit", "Zongzhi Liu", "Daniel McDonald",
"Kyle Bittinger", "Marcin Cieslik"]
__license__ = "GPL"
__version__ = "1.5.3"
__maintainer__ = "Rob Knight"
__email__ = "rob@spot.colorado.edu"
__status__ = "Production"
def safe_md5(open_file, block_size=2**20):
"""Computes an md5 sum without loading the file into memory
This method is based on the answers given in:
http://stackoverflow.com/questions/1131220/get-md5-hash-of-a-files-without-open-it-in-python
"""
md5 = hashlib.md5()
data = True
while data:
data = open_file.read(block_size)
if data:
md5.update(data)
return md5
def identity(x):
"""Identity function: useful for avoiding special handling for None."""
return x
def if_(test, true_result, false_result):
"""Convenience function for one-line if/then/else with known return values.
Note that both true_result and false_result are evaluated, which is not the
case for the normal if/then/else, so don't use if one branch might fail.
Additionally, true_result and false_result must be expressions, not
statements (so print and raise will not work, for example).
"""
if test:
return true_result
else:
return false_result
def iterable(item):
"""If item is iterable, returns item. Otherwise, returns [item].
Useful for guaranteeing a result that can be iterated over.
"""
try:
iter(item)
return item
except TypeError:
return [item]
def max_index(items):
"""Returns index of the largest item.
items can be any sequence. If there is a tie, returns latest item.
"""
return max([(item, index) for index, item in enumerate(items)])[1]
def min_index(items):
"""Returns index of the smallest item.
items can be any sequence. If there is a tie, returns earliest item"""
return min([(item, index) for index, item in enumerate(items)])[1]
def flatten(items):
"""Removes one level of nesting from items.
items can be any sequence, but flatten always returns a list.
"""
result = []
for i in items:
try:
result.extend(i)
except:
result.append(i)
return result
class DepthExceededError(Exception):
pass
def deep_list(x):
"""Convert tuple recursively to list."""
if isinstance(x, tuple):
return map(deep_list, x)
return x
def deep_tuple(x):
"""Convert list recursively to tuple."""
if isinstance(x, list):
return tuple(map(deep_tuple, x))
return x
def between((min_, max_), number):
"""Same as: min_ <= number <= max_."""
return min_ <= number <= max_
def combinate(items, n):
"""Returns combinations of items."""
if n == 0: yield []
else:
for i in xrange(len(items) - n + 1):
for cc in combinate(items[i + 1:], n - 1):
yield [items[i]] + cc
def gzip_dump(object, filename, bin=2):
"""Saves a compressed object to file."""
file = GzipFile(filename, 'wb')
file.write(dumps(object, bin))
try: # do not leave unlinked structures
object.link()
except AttributeError:
pass
file.close()
def gzip_load(filename):
"""Loads a compressed object from file."""
file = GzipFile(filename, 'rb')
buffer = []
while True:
data = file.read()
if data == "":
break
buffer.append(data)
buffer = "".join(buffer)
object = loads(buffer)
file.close()
return object
def recursive_flatten_old(items, max_depth=None, curr_depth=0):
"""Removes all nesting from items, recursively.
Note: Default max_depth is None, which removes all nesting (including
unpacking strings). Setting max_depth unpacks a maximum of max_depth levels
of nesting, but will not raise exception if the structure is not really
that deep (instead, will just remove the nesting that exists). If max_depth
is 0, will not remove any nesting (note difference from setting max_depth
to None).
"""
#bail out if greater than max_depth
if max_depth is not None:
if curr_depth > max_depth:
raise DepthExceededError
result = []
for i in items:
try:
result.extend(recursive_flatten(i, max_depth, curr_depth + 1))
except:
result.append(i)
return result
def curry(f, *a, **kw):
"""curry(f,x)(y) = f(x,y) or =lambda y: f(x,y)
modified from python cookbook"""
def curried(*more_a, **more_kw):
return f(*(a + more_a), **dict(kw, **more_kw))
## make docstring for curried funtion
curry_params = []
if a:
curry_params.extend([e for e in a])
if kw:
curry_params.extend(['%s=%s' % (k, v) for k, v in kw.items()])
#str it to prevent error in join()
curry_params = map(str, curry_params)
try:
f_name = f.func_name
except: #e.g. itertools.groupby failed .func_name
f_name = '?'
curried.__doc__ = ' curry(%s,%s)\n'\
'== curried from %s ==\n %s'\
% (f_name, ', '.join(curry_params), f_name, f.__doc__)
return curried
#end curry
def is_iterable(obj):
"""return True if obj is iterable"""
try:
iter(obj)
except TypeError, e:
return False
else:
return True
def is_char(obj):
"""return True if obj is a char (str with lenth<=1)"""
return isinstance(obj, basestring) and len(obj) <= 1
is_char_or_noniterable = lambda x: is_char(x) or\
not is_iterable(x)
is_str_or_noniterable = lambda x: isinstance(x, basestring) or\
not is_iterable(x)
def recursive_flatten(items, max_depth=None, curr_depth=1,
is_leaf=is_char_or_noniterable):
"""Removes all nesting from items, recursively.
Note: Default max_depth is None, which removes all nesting (including
unpacking strings). Setting max_depth unpacks a maximum of max_depth levels
of nesting, but will not raise exception if the structure is not really
that deep (instead, will just remove the nesting that exists). If max_depth
is 0, will not remove any nesting (note difference from setting max_depth
to None).
is_leaf: a predicate for 'leaf node'. The default is_char_or_noniterable
removes all nesting. is_str_or_noniterable removes all nesting sequences
except strings. is_leaf=not_list_tuple removes only nesting list or tuple
, which is considerably faster and recommended for general use.
"""
result = []
for i in items:
if max_depth is not None and curr_depth > max_depth \
or is_leaf(i):
result.append(i)
else:
result.extend(recursive_flatten(i,
max_depth, curr_depth + 1, is_leaf))
return result
#end recursive_flatten
def not_list_tuple(obj):
"""return False if obj is a list or a tuple"""
return not isinstance(obj, (list, tuple))
list_flatten = curry(recursive_flatten, is_leaf=not_list_tuple)
def unflatten(data, row_width, keep_extras=False):
"""Converts items in data into a list of row_width-length lists.
row_width must be an integer. Will raise error if zero.
data can be any sequence type, but results will always be lists at the
first level (including the common case of a list containing one sequence).
Any items left over after the last complete row will be discarded. This
means that the number of items in data need not be divisible by row_width.
This function does _not_ reverse the effect of zip, since the lists it
produces are not interleaved. If the list of lists were treated as a 2-d
array, its transpose would be the reverse of the effect of zip (i.e. the
original lists would be columns, not rows).
"""
if row_width < 1:
raise ValueError, "unflatten: row_width must be at least 1."
result = []
num_items = len(data)
slices = num_items / row_width
for s in xrange(slices):
result.append(data[s * row_width:(s + 1) * row_width])
if keep_extras:
last_slice = data[slices * row_width:]
if last_slice:
result.append(last_slice)
return result
def unzip(items):
"""Performs the reverse of zip, i.e. produces separate lists from tuples.
items should be list of k-element tuples. Will raise exception if any tuples
contain more items than the first one.
Conceptually equivalent to transposing the matrix of tuples.
Returns list of lists in which the ith list contains the ith element of each
tuple.
Note: zip expects *items rather than items, such that unzip(zip(*items))
returns something that compares equal to items.
Always returns lists: does not check original data type, but will accept
any sequence.
"""
if items:
return map(list, zip(*items))
else:
return []
def select(order, items):
"""Returns the elements from items specified in order, a list of indices.
Builds up a list containing the ith element in items for each item in order,
which must be a list of valid keys into items.
Example: vowels = select([0, 4, 8], 'abcdefghijklm')
Can also be used to emulate Perl's hash slices.
Example: reverse_vowel_freqs = select('ea', {'a':1,'b':5,'c':2,'d':4,'e':6})
Return type is a list of whatever type the elements in items are.
"""
return map(items.__getitem__, order)
def sort_order(items, cmpfunc=None):
"""Returns an array containing the sorted order of elements in items.
The returned array contains indexes. Looking up items[i] for i in
indexes returns a sorted list of the items in ascending order.
Useful for returning just the n best items, etc.
"""
indexed = [(item, index) for index, item in enumerate(items)]
if cmpfunc is None:
indexed.sort()
else:
indexed.sort(cmpfunc)
return [i[1] for i in indexed]
def find_all(text, pat):
"""Returns list of all overlapping occurrences of a pattern in a text.
Each item in the (sorted) list is the index of one of the matches.
"""
result = []
last = 0
try:
while 1:
curr = text.index(pat, last)
result.append(curr)
last = curr + 1
except ValueError: #raised when no more matches
return result
def find_many(text, pats):
"""Returns sorted list of all occurrences of all patterns in text.
Matches to all patterns are sorted together. Each item in the list is
the index of one of the matches.
WARNING: if pat is a single string, will search for the chars in the string
individually. If this is not what you want (i.e. you want to search for the
whole string), you should be using find_all instead; if you want to use
find_many anyway, put the string in a 1-item list.
"""
result = []
for pat in pats:
result.extend(find_all(text, pat))
result.sort()
return result
def unreserve(item):
"""Removes trailing underscore from item if it has one.
Useful for fixing mutations of Python reserved words, e.g. class.
"""
if hasattr(item, 'endswith') and item.endswith('_'):
return item[:-1]
else:
return item
def add_lowercase(d):
"""Adds lowercase version of keys in d to itself. Converts vals as well.
Should work on sequences of strings as well as strings.
Now also works on strings and sets.
"""
if hasattr(d, 'lower'): #behaves like a string
return d + d.lower()
elif not hasattr(d, 'items'): #not a dict
items = list(d)
return d.__class__(items + [i.lower() for i in items])
#otherwise, assume dict-like behavior
for key, val in d.items():
try:
new_key = key.lower()
except: #try to make tuple out of arbitrary sequence
try:
new_key = []
for k in key:
try:
new_key.append(k.lower())
except:
new_key.append(k)
new_key = tuple(new_key)
except:
new_key = key
try:
new_val = val.lower()
except:
new_val = val #don't care if we couldn't convert it
if new_key not in d: #don't overwrite existing lcase keys
d[new_key] = new_val
return d
def extract_delimited(line, left, right, start_index=0):
"""Returns the part of line from first left to first right delimiter.
Optionally begins searching at start_index.
Note: finds the next complete field (i.e. if we start in an incomplete
field, skip it and move to the next).
"""
if left == right:
raise TypeError, \
"extract_delimited is for fields w/ different left and right delimiters"
try:
field_start = line.index(left, start_index)
except ValueError: #no such field
return None
else:
try:
field_end = line.index(right, field_start)
except ValueError: #left but no right delimiter: raise error
raise ValueError, \
"Found '%s' but not '%s' in line %s, starting at %s." \
% (left, right, line, start_index)
#if we got here, we found the start and end of the field
return line[field_start + 1:field_end]
def caps_from_underscores(string):
"""Converts words_with_underscores to CapWords."""
words = string.split('_')
return ''.join([w.title() for w in words])
def InverseDict(d):
"""Returns inverse of d, setting keys to values and vice versa.
Note: if more than one key has the same value, returns an arbitrary key
for that value (overwrites with the last one encountered in the iteration).
Can be invoked with anything that can be an argument for dict(), including
an existing dict or a list of tuples. However, keys are always inserted in
arbitrary order rather than input order.
WARNING: will fail if any values are unhashable, e.g. if they are dicts or
lists.
"""
if isinstance(d, dict):
temp = d
else:
temp = dict(d)
return dict([(val, key) for key, val in temp.iteritems()])
def InverseDictMulti(d):
"""Returns inverse of d, setting keys to values and values to list of keys.
Note that each value will _always_ be a list, even if one item.
Can be invoked with anything that can be an argument for dict(), including
an existing dict or a list of tuples. However, keys are always appended in
arbitrary order, not the input order.
WARNING: will fail if any values are unhashable, e.g. if they are dicts or
lists.
"""
if isinstance(d, dict):
temp = d
else:
temp = dict(d)
result = {}
for key, val in temp.iteritems():
if val not in result:
result[val] = []
result[val].append(key)
return result
def DictFromPos(seq):
"""Returns dict mapping items to list of positions of those items in seq.
Always assigns values as lists, even if the item appeared only once.
WARNING: will fail if items in seq are unhashable, e.g. if seq is a list of
lists.
"""
result = {}
for i, s in enumerate(seq):
if s not in result:
result[s] = []
result[s].append(i)
return result
def DictFromFirst(seq):
"""Returns dict mapping each item to index of its first occurrence in seq.
WARNING: will fail if items in seq are unhashable, e.g. if seq is a list of
lists.
"""
result = {}
for i, s in enumerate(seq):
if s not in result:
result[s] = i
return result
def DictFromLast(seq):
"""Returns dict mapping each item to index of its last occurrence in seq.
WARNING: will fail if items in seq are unhashable, e.g. if seq is a list of
lists.
"""
return dict([(item, index) for index, item in enumerate(seq)])
def DistanceFromMatrix(matrix):
"""Returns function(i,j) that looks up matrix[i][j].
Useful for maintaining flexibility about whether a function is computed
or looked up.
Matrix can be a 2D dict (arbitrary keys) or list (integer keys).
"""
def result(i, j):
return matrix[i][j]
return result
def PairsFromGroups(groups):
"""Returns dict such that d[(i,j)] exists iff i and j share a group.
groups must be a sequence of sequences, e.g a list of strings.
"""
result = {}
for group in groups:
for i in group:
for j in group:
result[(i, j)] = None
return result
class ClassChecker(object):
"""Container for classes: 'if t in x == True' if t is the right class."""
def __init__(self, *Classes):
"""Returns a new ClassChecker that accepts specified classes."""
type_type = type(str)
for c in Classes:
if type(c) != type_type:
raise TypeError, \
"ClassChecker found non-type object '%s' in parameter list." % c
self.Classes = list(Classes)
def __contains__(self, item):
"""Returns True if item is a subclass of one of the classes in self."""
for c in self.Classes:
if isinstance(item, c):
return True
return False
def __str__(self):
"""Informal string representation: returns list"""
return str(self.Classes)
class Delegator(object):
"""Mixin class that forwards unknown attributes to a specified object.
Handles properties correctly (this was somewhat subtle).
WARNING: If you are delegating to an object that pretends to have every
attribute (e.g. a MappedRecord), you _must_ bypass normal attribute access
in __init__ of your subclasses to ensure that the properties are set in
the object itself, not in the object to which it delegates. Alternatively,
you can initialize with None so that unhandled attributes are set in self,
and then replace self._handler with your object right at the end of
__init__. The first option is probably safer and more general.
Warning: will not work on classes that use __slots__ instead of __dict__.
"""
def __init__(self, obj):
"""Returns a new Delegator that uses methods of obj.
NOTE: It's important that this bypasses the normal attribute setting
mechanism, or there's an infinite loop between __init__ and
__setattr__. However, subclasses should be able to use the normal
mechanism with impunity.
"""
self.__dict__['_handler'] = obj
def __getattr__(self, attr):
"""Forwards unhandled attributes to self._handler.
Sets _handler to None on first use if not already set.
"""
handler = self.__dict__.setdefault('_handler', None)
return getattr(handler, attr)
def __setattr__(self, attr, value):
"""Forwards requests to change unhandled attributes to self._handler.
This logic is rather complicated because of GenericRecord objects, which
masquerade as having every attribute, which can be used as handlers for
Delegators, which forward all requests to their handlers.
Consequently, we need to check the following:
1. Is attr in the object's __dict__? If so, set it in self.
2. Does the handler have attr? If so, try to set it in handler.
3. Does self lack the attr? If so, try to set it in handler.
4. Did setting attr in the handler fail? If so, set it in self.
"""
#if we're setting _handler, set it in dict directly (but complain if
#it's self).
if attr == '_handler':
if value is self:
raise ValueError, "Can't set object to be its own handler."
self.__dict__['_handler'] = value
return
#check if the attribute is in this object's dict
elif attr in self.__dict__:
return object.__setattr__(self, attr, value)
#then check if the class knows about it
elif hasattr(self.__class__, attr):
return object.__setattr__(self, attr, value)
#then try to set it in the handler
if hasattr(self._handler, attr) or not hasattr(self, attr):
try:
return setattr(self._handler, attr, value)
except AttributeError:
pass #will try to create the attribute on self
return object.__setattr__(self, attr, value)
class FunctionWrapper(object):
"""Wraps a function to hide it from a class so that it isn't a method."""
def __init__(self, Function):
self.Function = Function
def __call__(self, *args, **kwargs):
return self.Function(*args, **kwargs)
class ConstraintError(Exception):
"""Raised when constraint on a container is violated."""
pass
class ConstrainedContainer(object):
"""Mixin class providing constraint checking to a container.
Container should have a Constraint property that __contains__ the items
that will be allowed in the container. Can also have a Mask property that
contains a function that will be applied to each item (a) on checking the
item for validity, and (b) on inserting the item in the container.
WARNING: Because the Mask is evaluated both when the item is checked and
when it is inserted, any side-effects it has are applied _twice_. This
means that any Mask that mutates the object or changes global variables
is unlikely to do what you want!
"""
_constraint = None
Mask = FunctionWrapper(identity)
def _mask_for_new(self):
"""Returns self.Mask only if different from class data."""
if self.Mask is not self.__class__.Mask:
return self.Mask
else:
return None
def __init__(self, Constraint=None, Mask=None):
"""Returns new ConstrainedContainer, incorporating constraint.
WARNING: Does not perform validation. It is the subclass's
responsibility to perform validation during __init__ or __new__!
"""
if Constraint is not None:
self._constraint = Constraint
if Mask is not None:
self.Mask = Mask
def matchesConstraint(self, constraint):
"""Returns True if all items in self are allowed."""
#First checks if constraints are compatible. If not, or if the current
#sequence has no constraint, does item by item search.
#bail out if self or constraint is empty
if not constraint or not self:
return True
#try checking constraints for compatibility
if self.Constraint:
try:
constraint_ok = True
for c in self.Constraint:
if c not in constraint:
constraint_ok = False
break
if constraint_ok:
return True
except TypeError:
pass #e.g. tried to check wrong type item in string alphabet
#get here if either self.Constraint is empty, or if we found an item
#in self.Constraint that wasn't in the other constraint. In either case,
#we need to check self item by item.
if self:
try:
for i in self:
if i not in constraint:
return False
except TypeError: #e.g. tried to check int in string alphabet
return False
return True
def otherIsValid(self, other):
"""Returns True if other has only items allowed in self.Constraint."""
#First, checks other.Constrant for compatibility.
#If other.Constraint is incompatible, checks items in other.
mask = self.Mask
constraint = self.Constraint
if not constraint or not other:
return True #bail out if empty
try:
#if other has a constraint, check whether it's compatible
other_constraint = other.Constraint
if other_constraint:
for c in map(mask, other_constraint):
if c not in constraint:
raise ConstraintError
return True
except (ConstraintError, AttributeError, TypeError):
pass
#get here if other doesn't have a constraint or if other's constraint
#isn't valid on self's constraint.
try:
for item in map(mask, other):
if item not in constraint:
return False
except TypeError:
return False #e.g. tried to check int in str alphabet
return True
def itemIsValid(self, item):
"""Returns True if single item is in self.Constraint."""
try:
if (not self.Constraint) or self.Mask(item) in self.Constraint:
return True
else:
return False
except (TypeError, ConstraintError): #wrong type or not allowed
return False
def sequenceIsValid(self, sequence):
"""Returns True if all items in sequence are in self.Constraint."""
is_valid = self.itemIsValid
for i in map(self.Mask, sequence):
if not is_valid(i):
return False
return True
def _get_constraint(self):
"""Accessor for constraint."""
return self._constraint
def _set_constraint(self, constraint):
"""Mutator for constraint."""
if self.matchesConstraint(constraint):
self._constraint = constraint
else:
raise ConstraintError, \
"Sequence '%s' incompatible with constraint '%s'" % (self, constraint)
Constraint = property(_get_constraint, _set_constraint)
class ConstrainedString(str, ConstrainedContainer):
"""String that is always valid on a specified constraint."""
def __new__(cls, data, Constraint=None, Mask=None):
"""Constructor class method for validated ConstrainedString."""
mask = Mask or cls.Mask
if data == '':
pass #map can't handle an empty sequence, sadly...
elif isinstance(data, str):
data = ''.join(map(mask, data))
else:
try:
data = str(map(mask, data))
except (TypeError, ValueError):
data = str(mask(data))
new_string = str.__new__(cls, data)
curr_constraint = Constraint or new_string.Constraint
if curr_constraint and new_string:
for c in new_string:
try:
is_valid = c in curr_constraint
except TypeError:
is_valid = False
if not is_valid:
raise ConstraintError, \
"Character '%s' not in constraint '%s'" % (c, curr_constraint)
return new_string
def __init__(self, data, Constraint=None, Mask=None):
"""Constructor for validated ConstrainedString."""
ConstrainedContainer.__init__(self, Constraint, Mask)
def __add__(self, other):
"""Returns copy of self added to copy of other if constraint correct."""
if not self.otherIsValid(other):
raise ConstraintError, \
"Sequence '%s' doesn't meet constraint" % other
result = self.__class__(str(self) + ''.join(map(self.Mask, other)), \
Constraint=self.Constraint)
mask = self._mask_for_new()
if mask:
result.Mask = mask
return result
def __mul__(self, multiplier):
"""Returns copy of self multiplied by multiplier."""
result = self.__class__(str.__mul__(self, multiplier),
Constraint=self.Constraint)
mask = self._mask_for_new()
if mask:
result.Mask = mask
return result
def __rmul__(self, multiplier):
"""Returns copy of self multiplied by multiplier."""
result = self.__class__(str.__rmul__(self, multiplier),
Constraint=self.Constraint)
mask = self._mask_for_new()
if mask:
result.Mask = mask
return result
def __getslice__(self, *args, **kwargs):
"""Make sure slice remembers the constraint."""
result = self.__class__(str.__getslice__(self, *args, **kwargs), \
Constraint=self.Constraint)
mask = self._mask_for_new()
if mask:
result.Mask = mask
return result
def __getitem__(self, index):
"""Make sure extended slice remembers the constraint."""
items = str.__getitem__(self, index)
if isinstance(index, slice):
mask = self._mask_for_new()
result = self.__class__(items, Constraint=self.Constraint)
if mask:
result.Mask = mask
return result
else:
return items
class MappedString(ConstrainedString):
"""As for ConstrainedString, but maps __contained__ and __getitem__."""
def __contains__(self, item):
"""Ensure that contains applies the mask."""
try:
return super(MappedString, self).__contains__(self.Mask(item))
except (TypeError, ValueError):
return False
class ConstrainedList(ConstrainedContainer, list):
"""List that is always valid on a specified constraint."""
def __init__(self, data=None, Constraint=None, Mask=None):
"""Constructor for validated ConstrainedList."""
ConstrainedContainer.__init__(self, Constraint, Mask)
if data:
self.extend(data)
def __add__(self, other):
"""Returns copy of self added to copy of other if constraint correct."""
result = self.__class__(list(self) + map(self.Mask, other) , \
Constraint=self.Constraint)
mask = self._mask_for_new()
if mask:
result.Mask = mask
return result
def __iadd__(self, other):
"""Adds other to self if constraint correct."""
other = map(self.Mask, other)
if self.otherIsValid(other):
return list.__iadd__(self, other)
else:
raise ConstraintError, \
"Sequence '%s' has items not in constraint '%s'" \
% (other, self.Constraint)
def __mul__(self, multiplier):
"""Returns copy of self multiplied by multiplier."""
result = self.__class__(list(self) * multiplier,
Constraint=self.Constraint)
mask = self._mask_for_new()
if mask:
result.Mask = mask
return result
def __rmul__(self, multiplier):
"""Returns copy of self multiplied by multiplier."""
result = self.__class__(list(self) * multiplier,
Constraint=self.Constraint)
mask = self._mask_for_new()
if mask:
result.Mask = mask
return result
def __setitem__(self, index, item):
"""Sets self[index] to item if item in constraint. Handles slices"""
if isinstance(index, slice):
if not self.otherIsValid(item):
raise ConstraintError, \
"Sequence '%s' contains items not in constraint '%s'." % \
(item, self.Constraint)
item = map(self.Mask, item)
else:
if not self.itemIsValid(item):
raise ConstraintError, "Item '%s' not in constraint '%s'" % \
(item, self.Constraint)
item = self.Mask(item)
list.__setitem__(self, index, item)
def append(self, item):
"""Appends item to self."""
if not self.itemIsValid(item):
raise ConstraintError, "Item '%s' not in constraint '%s'" % \
(item, self.Constraint)
list.append(self, self.Mask(item))
def extend(self, sequence):
"""Appends sequence to self."""
if self.otherIsValid(sequence):
list.extend(self, map(self.Mask, sequence))
else:
raise ConstraintError, "Some items in '%s' not in constraint '%s'"\
% (sequence, self.Constraint)
def insert(self, position, item):
"""Inserts item at position in self."""
if not self.itemIsValid(item):
raise ConstraintError, "Item '%s' not in constraint '%s'" % \
(item, self.Constraint)
list.insert(self, position, self.Mask(item))
def __getslice__(self, *args, **kwargs):
"""Make sure slice remembers the constraint."""
result = self.__class__(list.__getslice__(self, *args, **kwargs), \
Constraint=self.Constraint)
mask = self._mask_for_new()
if mask:
result.Mask = mask
return result
def __setslice__(self, start, end, sequence):
"""Make sure invalid data can't get into slice."""
if self.otherIsValid(sequence):
list.__setslice__(self, start, end, map(self.Mask, sequence))
else:
raise ConstraintError, \
"Sequence '%s' has items not in constraint '%s'"\
% (sequence, self.Constraint)
class MappedList(ConstrainedList):
"""As for ConstrainedList, but maps items on contains and getitem."""
def __contains__(self, item):
"""Ensure that contains applies the mask."""
try:
return super(MappedList, self).__contains__(self.Mask(item))
except (TypeError, ValueError):
return False
class ConstrainedDict(ConstrainedContainer, dict):
"""Dict containing only keys that are valid on a specified constraint.
Default behavior when fed a sequence is to store counts of the items in
that sequence, which is not the standard dict interface (should raise a
ValueError instead) but which is surprisingly useful in practice.
"""
ValueMask = FunctionWrapper(identity)
def _get_mask_and_valmask(self):
"""Helper method to check whether Mask and ValueMask were set."""
if self.Mask is self.__class__.Mask:
mask = None
else:
mask = self.Mask
if self.ValueMask is self.__class__.ValueMask:
valmask = None
else:
valmask = self.ValueMask
return mask, valmask
def __init__(self, data=None, Constraint=None, Mask=None, ValueMask=None):
"""Constructor for validated ConstrainedDict."""
ConstrainedContainer.__init__(self, Constraint, Mask)
if ValueMask is not None:
self.ValueMask = ValueMask
if data:
try:
self.update(data)
except (ValueError, TypeError):
for d in map(self.Mask, iterable(data)):
curr = self.get(d, 0)
self[d] = curr + 1
def __setitem__(self, key, value):
"""Sets self[key] to value if value in constraint."""
if not self.itemIsValid(key):
raise ConstraintError, "Item '%s' not in constraint '%s'" % \
(key, self.Constraint)
key, value = self.Mask(key), self.ValueMask(value)
dict.__setitem__(self, key, value)
def copy(self):
"""Should return copy of self, including constraint."""
mask, valmask = self._get_mask_and_valmask()
return self.__class__(self, Constraint=self.Constraint, Mask=mask,
ValueMask=valmask)
def fromkeys(self, keys, value=None):
"""Returns new dictionary with same constraint as self."""
mask, valmask = self._get_mask_and_valmask()
return self.__class__(dict.fromkeys(keys, value),
Constraint=self.Constraint, Mask=mask, ValueMask=valmask)
def setdefault(self, key, default=None):
"""Returns self[key], setting self[key]=default if absent."""
key, default = self.Mask(key), self.ValueMask(default)
if key not in self:
self[key] = default
return self[key]
def update(self, other):
"""Updates self with items in other.
Implementation note: currently uses __setitem__, so no need to apply
masks in this method.
"""
if not hasattr(other, 'keys'):
other = dict(other)
for key in other:
self[key] = other[key]
class MappedDict(ConstrainedDict):
"""As for ConstrainedDict, but maps keys on contains and getitem."""
def __contains__(self, item):
"""Ensure that contains applies the mask."""
try:
return super(MappedDict, self).__contains__(self.Mask(item))
except (TypeError, ValueError):
return False
def __getitem__(self, item):
"""Ensure that getitem applies the mask."""
return super(MappedDict, self).__getitem__(self.Mask(item))
def get(self, item, default=None):
"""Ensure that get applies the mask."""
return super(MappedDict, self).get(self.Mask(item), default)
def has_key(self, item):
"""Ensure that has_key applies the mask."""
return super(MappedDict, self).has_key(self.Mask(item))
def getNewId(rand_f=randrange):
"""Creates a random 12-digit integer to be used as an id."""
NUM_DIGITS = 12
return ''.join(map(str, [rand_f(10) for i in range(NUM_DIGITS)]))
#end function getNewId
def generateCombinations(alphabet, combination_length):
"""Returns an array of strings: all combinations of a given length.
alphabet: a sequence (string or list) type object containing the
characters that can be used to make the combinations.
combination_length: a long-castable value (integer) specifying the
length of desired combinations.
comb is used as an abbreviation of combinations throughout.
"""
found_combs = []
num_combs = 0
try:
alphabet_len = len(alphabet)
combination_length = long(combination_length)
except TypeError, ValueError: #conversion failed
raise RuntimeError, "Bad parameter: alphabet must be of sequence " + \
"type and combination_length must be castable " + \
"to long."
#end parameter conversion try/catch
#the number of combs is alphabet length raised to the combination length
if combination_length != 0:
num_combs = pow(alphabet_len, combination_length)
#end if
for curr_comb_num in xrange(num_combs):
curr_digit = 0
curr_comb = [0] * combination_length
while curr_comb_num:
curr_comb[curr_digit] = curr_comb_num % alphabet_len
curr_comb_num = curr_comb_num / alphabet_len
curr_digit += 1
#end while
#now translate the list of digits into a list of characters
real_comb = []
for position in curr_comb: real_comb.append(alphabet[position])
found_combs.append("".join(real_comb))
#next combination number
return found_combs
#end generateCombinations
def toString(obj):
"""Public function to write a string of object's properties & their vals.
This function looks only at the local properties/methods/etc of the
object it is sent, and only examines public and first-level private
(starts with _ but not __) entries. It ignores anything that is a
method, function, or class. Any attribute whose value looks like a
printout of a memory address (starts with < and ends with >) has its
value replaced with the word "object".
"""
ignored_types = [types.BuiltinFunctionType, types.BuiltinMethodType, \
types.ClassType, types.FunctionType, types.MethodType, \
types.UnboundMethodType]
result = []
for slot in obj.__dict__:
if not slot.startswith("__"):
ignore_attr = False
attr = getattr(obj, slot)
for ignored_type in ignored_types:
if isinstance(attr, ignored_type): ignore_attr = True
#next ignored type
if not ignore_attr:
attr_value = str(attr)
if attr_value.startswith("<") and attr_value.endswith(">"):
attr_value = "object"
#end if
result.append(slot + ": " + attr_value)
#end if
#end if
#next property
return "; ".join(result)
#end toString
#A class for exceptions caused when param cannot be cast to nonneg int
class NonnegIntError(ValueError):
"""for exceptions caused when param cannot be cast to nonneg int"""
def __init__(self, args=None):
self.args = args
#end __init__
#end NonnegIntError
def makeNonnegInt(n):
"""Public function to cast input to nonneg int and return, or raise err"""
try:
n = abs(int(n))
except:
raise NonnegIntError, n + " must be castable to a nonnegative int"
#end try/except
return n
#end makeNonnegInt
def reverse_complement(seq, use_DNA=True):
"""Public function to reverse complement DNA or RNA sequence string
seq: a string
use_DNA: a boolean indicating (if true) that A should translate to T.
If false, RNA is assumed (A translates to U). Default is True.
Returns a reverse complemented string.
"""
bad_chars = set(seq) - set("ACGTUacgtu")
if len(bad_chars) > 0:
raise ValueError,\
("Only ACGTU characters may be passed to reverse_complement. Other "
"characters were identified: %s. Use cogent.DNA.rc if you need to "
"reverse complement ambiguous bases." % ''.join(bad_chars))
#decide which translation to use for complementation
if use_DNA:
trans_table = maketrans("ACGTacgt", "TGCAtgca")
else:
trans_table = maketrans("ACGUacgu", "UGCAugca")
#end if
#complement the input sequence, then reverse
complemented = seq.translate(trans_table)
comp_list = list(complemented)
comp_list.reverse()
#join the reverse-complemented list and return
return "".join(comp_list)
# The reverse_complement function was previously called revComp, but that
# naming doesn't adhere to the PyCogent coding guidelines. Renamed, but
# keeping the old name around to not break existing code.
revComp = reverse_complement
#end revComp
def timeLimitReached(start_time, time_limit):
"""Return true if more that time_limit has elapsed since start_time"""
result = False
curr_time = clock()
elapsed = curr_time - start_time
if elapsed > time_limit: result = True
return result
#end _time_limit_reached
def not_none(seq):
"""Returns True if no item in seq is None."""
for i in seq:
if i is None:
return False
return True
#end not_none
def get_items_except(seq, indices, seq_constructor=None):
"""Returns all items in seq that are not in indices
Returns the same type as parsed in except when a seq_constructor is set.
"""
sequence = list(seq)
index_lookup = dict.fromkeys(indices)
result = [sequence[i] for i in range(len(seq)) \
if i not in index_lookup]
if not seq_constructor:
if isinstance(seq, str):
return ''.join(result)
else:
seq_constructor = seq.__class__
return seq_constructor(result)
#end get_items_except
def NestedSplitter(delimiters=[None], same_level=False,
constructor=strip, filter_=False):
"""return a splitter which return a list (maybe nested) from a str using
delimiters nestedly
same_level -- if true, all the leaf items will be split whether there is
delimiters in it or not
constructor: modify each splited fields.
filter_: filter the splits if not False(default)
Note: the line input in parser is expected to be a str, but without check
"""
def parser(line, index=0):
#split line with curr delimiter
curr = delimiters[index]
if isinstance(curr, (list, tuple)):
try:
delim, maxsplits = curr
except ValueError:
raise ValueError("delimiter tuple/list should be \
[delimiter_str, maxsplits]")
if maxsplits < 0:
result = line.rsplit(delim, -maxsplits)
else:
result = line.split(delim, maxsplits)
else:
result = line.split(curr)
#modify splits if required
if constructor:
result = map(constructor, result)
if filter_ != False: #allow filter(None,..) to rip off the empty items
result = filter(filter_, result)
#repeat recursively for next delimiter
if index != len(delimiters) - 1: #not last delimiter
result = [parser(f, index + 1) for f in result]
#undo split if curr not in line and same_level==False
#ignore the first delimiter
if not same_level and index > 0 \
and len(result) == 1 and isinstance(result[0], basestring):
result = result[0]
return result
#parser.__doc__ = make_innerdoc(NestedSplitter, parser, locals())
return parser
#end NestedSplitter
def app_path(app,env_variable='PATH'):
"""Returns path to an app, or False if app does not exist in env_variable
This functions in the same way as which in that it returns
the first path that contains the app.
"""
# strip off " characters, in case we got a FilePath object
app = app.strip('"')
paths = getenv(env_variable).split(':')
for path in paths:
p = join(path,app)
if exists(p):
return p
return False
#some error codes for creating a dir
def get_create_dir_error_codes():
return {'NO_ERROR': 0,
'DIR_EXISTS': 1,
'FILE_EXISTS': 2,
'OTHER_OS_ERROR':3}
def create_dir(dir_name, fail_on_exist=False, handle_errors_externally=False):
"""Create a dir safely and fail meaningful.
dir_name: name of directory to create
fail_on_exist: if true raise an error if dir already exists
handle_errors_externally: if True do not raise Errors, but return
failure codes. This allows to handle errors locally and
e.g. hint the user at a --force_overwrite options.
returns values (if no Error raised):
0: dir could be safely made
1: directory already existed
2: a file with the same name exists
3: any other unspecified OSError
See qiime/denoiser.py for an example of how to use this mechanism.
Note: Depending of how thorough we want to be we could add tests,
e.g. for testing actual write permission in an existing dir.
"""
error_code_lookup = get_create_dir_error_codes()
#pre-instanciate function with
ror = curry(handle_error_codes, dir_name, handle_errors_externally)
if exists(dir_name):
if isdir(dir_name):
#dir is there
if fail_on_exist:
return ror(error_code_lookup['DIR_EXISTS'])
else:
return error_code_lookup['DIR_EXISTS']
else:
#must be file with same name
return ror(error_code_lookup['FILE_EXISTS'])
else:
#no dir there, try making it
try:
makedirs(dir_name)
except OSError:
return ror(error_code_lookup['OTHER_OS_ERROR'])
return error_code_lookup['NO_ERROR']
def handle_error_codes(dir_name, supress_errors=False,
error_code=None):
"""Wrapper function for error_handling.
dir_name: name of directory that raised the error
suppress_errors: if True raise Errors, otherwise return error_code
error_code: the code for the error
"""
error_code_lookup = get_create_dir_error_codes()
if error_code == None:
error_code = error_code_lookup['NO_ERROR']
error_strings = \
{error_code_lookup['DIR_EXISTS'] :
"Directory already exists: %s" % dir_name,
error_code_lookup['FILE_EXISTS'] :
"File with same name exists: %s" % dir_name,
error_code_lookup['OTHER_OS_ERROR']:
"Could not create output directory: %s. " % dir_name +
"Check the permissions."}
if error_code == error_code_lookup['NO_ERROR']:
return error_code_lookup['NO_ERROR']
if supress_errors:
return error_code
else:
raise OSError, error_strings[error_code]
def remove_files(list_of_filepaths, error_on_missing=True):
"""Remove list of filepaths, optionally raising an error if any are missing
"""
missing = []
for fp in list_of_filepaths:
try:
remove(fp)
except OSError:
missing.append(fp)
if error_on_missing and missing:
raise OSError, "Some filepaths were not accessible: %s" % '\t'.join(missing)
def get_random_directory_name(suppress_mkdir=False,\
timestamp_pattern='%Y%m%d%H%M%S',\
rand_length=20,\
output_dir=None,\
prefix='',
suffix='',
return_absolute_path=True):
"""Build a random directory name and create the directory
suppress_mkdir: only build the directory name, don't
create the directory (default: False)
timestamp_pattern: string passed to strftime() to generate
the timestamp (pass '' to suppress the timestamp)
rand_length: length of random string of characters
output_dir: the directory which should contain the
random directory
prefix: prefix for directory name
suffix: suffix for directory name
return_absolute_path: If False, returns the local (relative) path to the new directory
"""
output_dir = output_dir or './'
# Define a set of characters to be used in the random directory name
chars = "abcdefghigklmnopqrstuvwxyz"
picks = chars + chars.upper() + "0123456790"
# Get a time stamp
timestamp = datetime.now().strftime(timestamp_pattern)
# Construct the directory name
dirname = '%s%s%s%s' % (prefix,timestamp,\
''.join([choice(picks) for i in range(rand_length)]),\
suffix)
dirpath = join(output_dir,dirname)
abs_dirpath = abspath(dirpath)
# Make the directory
if not suppress_mkdir:
try:
makedirs(abs_dirpath)
except OSError:
raise OSError,\
"Cannot make directory %s. Do you have write access?" % dirpath
# Return the path to the directory
if return_absolute_path:
return abs_dirpath
return dirpath
def get_independent_coords(spans, random_tie_breaker=False):
"""returns non-overlapping spans. spans must have structure
[(start, end, ..), (..)]. spans can be decorated with arbitrary data
after the end entry.
Arguments:
- random_tie_breaker: break overlaps by randomly choosing the first
or second span. Defaults to the first span.
"""
if len(spans) <= 1:
return spans
last = spans[0]
result = [last]
for i in range(1, len(spans)):
curr = spans[i]
if curr[0] < last[1]:
if random_tie_breaker:
result[-1] = [last, curr][randint(0,1)]
else:
result[-1] = last
continue
result.append(curr)
last = curr
return result
def get_merged_overlapping_coords(start_end):
"""merges overlapping spans, assumes sorted by start"""
result = [start_end[0]]
prev_end = result[0][-1]
for i in range(1, len(start_end)):
curr_start, curr_end = start_end[i]
# if we're beyond previous, add and continue
if curr_start > prev_end:
prev_end = curr_end
result.append([curr_start, curr_end])
elif curr_end > prev_end:
prev_end = curr_end
result[-1][-1] = prev_end
else:
pass # we lie completely within previous span
return result
def get_run_start_indices(values, digits=None, converter_func=None):
"""returns starting index, value for all distinct values"""
assert not (digits and converter_func), \
'Cannot set both digits and converter_func'
if digits is not None:
converter_func = lambda x: round(x, digits)
elif converter_func is None:
converter_func = lambda x: x
last_val = None
for index, val in enumerate(values):
val = converter_func(val)
if val != last_val:
yield [index, val]
last_val = val
return
def get_merged_by_value_coords(spans_value, digits=None):
"""returns adjacent spans merged if they have the same value. Assumes
[(start, end, val), ..] structure and that spans_value is sorted in
ascending order.
Arguments:
- digits: if None, any data can be handled and exact values are
compared. Otherwise values are rounded to that many digits.
"""
assert len(spans_value[0]) == 3, 'spans_value must have 3 records per row'
starts, ends, vals = zip(*spans_value)
indices_distinct_vals = get_run_start_indices(vals, digits=digits)
data = []
i = 0
for index, val in indices_distinct_vals:
start = starts[index]
end = ends[index]
prev_index = max(index-1, 0)
try:
data[-1][1] = ends[prev_index]
except IndexError:
pass
data.append([start, end, val])
if index < len(ends):
data[-1][1] = ends[-1]
return data
|