/usr/share/pyshared/ZODB/Connection.py is in python-zodb 1:3.9.7-2.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 | ##############################################################################
#
# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Database connection support
$Id: Connection.py 117033 2010-09-28 20:27:49Z jim $"""
import logging
import sys
import tempfile
import threading
import warnings
import os
import shutil
import time
from persistent import PickleCache
# interfaces
from persistent.interfaces import IPersistentDataManager
from ZODB.interfaces import IConnection
from ZODB.interfaces import IBlobStorage
from ZODB.interfaces import IMVCCStorage
from ZODB.blob import Blob, rename_or_copy_blob, remove_committed_dir
from transaction.interfaces import ISavepointDataManager
from transaction.interfaces import IDataManagerSavepoint
from transaction.interfaces import ISynchronizer
from zope.interface import implements
import transaction
import ZODB
from ZODB.blob import SAVEPOINT_SUFFIX
from ZODB.ConflictResolution import ResolvedSerial
from ZODB.ExportImport import ExportImport
from ZODB import POSException
from ZODB.POSException import InvalidObjectReference, ConnectionStateError
from ZODB.POSException import ConflictError, ReadConflictError
from ZODB.POSException import Unsupported, ReadOnlyHistoryError
from ZODB.POSException import POSKeyError
from ZODB.serialize import ObjectWriter, ObjectReader, myhasattr
from ZODB.utils import p64, u64, z64, oid_repr, positive_id
from ZODB import utils
global_reset_counter = 0
def resetCaches():
"""Causes all connection caches to be reset as connections are reopened.
Zope's refresh feature uses this. When you reload Python modules,
instances of classes continue to use the old class definitions.
To use the new code immediately, the refresh feature asks ZODB to
clear caches by calling resetCaches(). When the instances are
loaded by subsequent connections, they will use the new class
definitions.
"""
global global_reset_counter
global_reset_counter += 1
class Connection(ExportImport, object):
"""Connection to ZODB for loading and storing objects."""
implements(IConnection,
ISavepointDataManager,
IPersistentDataManager,
ISynchronizer)
_code_timestamp = 0
##########################################################################
# Connection methods, ZODB.IConnection
def __init__(self, db, cache_size=400, before=None, cache_size_bytes=0):
"""Create a new Connection."""
self._log = logging.getLogger('ZODB.Connection')
self._debug_info = ()
self._db = db
# historical connection
self.before = before
# Multi-database support
self.connections = {self._db.database_name: self}
storage = db.storage
if IMVCCStorage.providedBy(storage):
# Use a connection-specific storage instance.
self._mvcc_storage = True
storage = storage.new_instance()
else:
self._mvcc_storage = False
self._normal_storage = self._storage = storage
self.new_oid = storage.new_oid
self._savepoint_storage = None
# Do we need to join a txn manager?
self._needs_to_join = True
self.transaction_manager = None
self.opened = None # time.time() when DB.open() opened us
self._reset_counter = global_reset_counter
self._load_count = 0 # Number of objects unghosted
self._store_count = 0 # Number of objects stored
# Cache which can ghostify (forget the state of) objects not
# recently used. Its API is roughly that of a dict, with
# additional gc-related and invalidation-related methods.
self._cache = PickleCache(self, cache_size, cache_size_bytes)
# The pre-cache is used by get to avoid infinite loops when
# objects immediately load their state whern they get their
# persistent data set.
self._pre_cache = {}
# List of all objects (not oids) registered as modified by the
# persistence machinery, or by add(), or whose access caused a
# ReadConflictError (just to be able to clean them up from the
# cache on abort with the other modified objects). All objects
# of this list are either in _cache or in _added.
self._registered_objects = []
# Dict of oid->obj added explicitly through add(). Used as a
# preliminary cache until commit time when objects are all moved
# to the real _cache. The objects are moved to _creating at
# commit time.
self._added = {}
# During commit this is turned into a list, which receives
# objects added as a side-effect of storing a modified object.
self._added_during_commit = None
# During commit, all objects go to either _modified or _creating:
# Dict of oid->flag of new objects (without serial), either
# added by add() or implicitly added (discovered by the
# serializer during commit). The flag is True for implicit
# adding. Used during abort to remove created objects from the
# _cache, and by persistent_id to check that a new object isn't
# reachable from multiple databases.
self._creating = {}
# List of oids of modified objects, which have to be invalidated
# in the cache on abort and in other connections on finish.
self._modified = []
# _invalidated queues invalidate messages delivered from the DB
# _inv_lock prevents one thread from modifying the set while
# another is processing invalidations. All the invalidations
# from a single transaction should be applied atomically, so
# the lock must be held when reading _invalidated.
# It sucks that we have to hold the lock to read _invalidated.
# Normally, _invalidated is written by calling dict.update, which
# will execute atomically by virtue of the GIL. But some storage
# might generate oids where hash or compare invokes Python code. In
# that case, the GIL can't save us.
# Note: since that was written, it was officially declared that the
# type of an oid is str. TODO: remove the related now-unnecessary
# critical sections (if any -- this needs careful thought).
self._inv_lock = threading.Lock()
self._invalidated = set()
# Flag indicating whether the cache has been invalidated:
self._invalidatedCache = False
# We intend to prevent committing a transaction in which
# ReadConflictError occurs. _conflicts is the set of oids that
# experienced ReadConflictError. Any time we raise ReadConflictError,
# the oid should be added to this set, and we should be sure that the
# object is registered. Because it's registered, Connection.commit()
# will raise ReadConflictError again (because the oid is in
# _conflicts).
self._conflicts = {}
# _txn_time stores the upper bound on transactions visible to
# this connection. That is, all object revisions must be
# written before _txn_time. If it is None, then the current
# revisions are acceptable.
self._txn_time = None
# To support importFile(), implemented in the ExportImport base
# class, we need to run _importDuringCommit() from our commit()
# method. If _import is not None, it is a two-tuple of arguments
# to pass to _importDuringCommit().
self._import = None
self._reader = ObjectReader(self, self._cache, self._db.classFactory)
def add(self, obj):
"""Add a new object 'obj' to the database and assign it an oid."""
if self.opened is None:
raise ConnectionStateError("The database connection is closed")
marker = object()
oid = getattr(obj, "_p_oid", marker)
if oid is marker:
raise TypeError("Only first-class persistent objects may be"
" added to a Connection.", obj)
elif obj._p_jar is None:
assert obj._p_oid is None
oid = obj._p_oid = self._storage.new_oid()
obj._p_jar = self
if self._added_during_commit is not None:
self._added_during_commit.append(obj)
self._register(obj)
# Add to _added after calling register(), so that _added
# can be used as a test for whether the object has been
# registered with the transaction.
self._added[oid] = obj
elif obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar)
def get(self, oid):
"""Return the persistent object with oid 'oid'."""
if self.opened is None:
raise ConnectionStateError("The database connection is closed")
obj = self._cache.get(oid, None)
if obj is not None:
return obj
obj = self._added.get(oid, None)
if obj is not None:
return obj
obj = self._pre_cache.get(oid, None)
if obj is not None:
return obj
# This appears to be an MVCC violation because we are loading
# the must recent data when perhaps we shouldnt. The key is
# that we are only creating a ghost!
# A disadvantage to this optimization is that _p_serial cannot be
# trusted until the object has been loaded, which affects both MVCC
# and historical connections.
p, serial = self._storage.load(oid, '')
obj = self._reader.getGhost(p)
# Avoid infiniate loop if obj tries to load its state before
# it is added to the cache and it's state refers to it.
self._pre_cache[oid] = obj
obj._p_oid = oid
obj._p_jar = self
obj._p_changed = None
obj._p_serial = serial
self._pre_cache.pop(oid)
self._cache[oid] = obj
return obj
def cacheMinimize(self):
"""Deactivate all unmodified objects in the cache.
"""
for connection in self.connections.itervalues():
connection._cache.minimize()
# TODO: we should test what happens when cacheGC is called mid-transaction.
def cacheGC(self):
"""Reduce cache size to target size.
"""
for connection in self.connections.itervalues():
connection._cache.incrgc()
__onCloseCallbacks = None
def onCloseCallback(self, f):
"""Register a callable, f, to be called by close()."""
if self.__onCloseCallbacks is None:
self.__onCloseCallbacks = []
self.__onCloseCallbacks.append(f)
def close(self, primary=True):
"""Close the Connection."""
if not self._needs_to_join:
# We're currently joined to a transaction.
raise ConnectionStateError("Cannot close a connection joined to "
"a transaction")
if self._cache is not None:
self._cache.incrgc() # This is a good time to do some GC
# Call the close callbacks.
if self.__onCloseCallbacks is not None:
for f in self.__onCloseCallbacks:
try:
f()
except: # except what?
f = getattr(f, 'im_self', f)
self._log.error("Close callback failed for %s", f,
exc_info=sys.exc_info())
self.__onCloseCallbacks = None
self._debug_info = ()
if self.opened:
self.transaction_manager.unregisterSynch(self)
if self._mvcc_storage:
self._storage.sync(force=False)
if primary:
for connection in self.connections.values():
if connection is not self:
connection.close(False)
# Return the connection to the pool.
if self.opened is not None:
self._db._returnToPool(self)
# _returnToPool() set self.opened to None.
# However, we can't assert that here, because self may
# have been reused (by another thread) by the time we
# get back here.
else:
self.opened = None
def db(self):
"""Returns a handle to the database this connection belongs to."""
return self._db
def isReadOnly(self):
"""Returns True if this connection is read only."""
if self.opened is None:
raise ConnectionStateError("The database connection is closed")
return self.before is not None or self._storage.isReadOnly()
def invalidate(self, tid, oids):
"""Notify the Connection that transaction 'tid' invalidated oids."""
if self.before is not None:
# This is a historical connection. Invalidations are irrelevant.
return
self._inv_lock.acquire()
try:
if self._txn_time is None:
self._txn_time = tid
elif (tid < self._txn_time) and (tid is not None):
raise AssertionError("invalidations out of order, %r < %r"
% (tid, self._txn_time))
self._invalidated.update(oids)
finally:
self._inv_lock.release()
def invalidateCache(self):
self._inv_lock.acquire()
try:
self._invalidatedCache = True
finally:
self._inv_lock.release()
@property
def root(self):
"""Return the database root object."""
return RootConvenience(self.get(z64))
def get_connection(self, database_name):
"""Return a Connection for the named database."""
connection = self.connections.get(database_name)
if connection is None:
new_con = self._db.databases[database_name].open(
transaction_manager=self.transaction_manager,
before=self.before,
)
self.connections.update(new_con.connections)
new_con.connections = self.connections
connection = new_con
return connection
def _implicitlyAdding(self, oid):
"""Are we implicitly adding an object within the current transaction
This is used in a check to avoid implicitly adding an object
to a database in a multi-database situation.
See serialize.ObjectWriter.persistent_id.
"""
return (self._creating.get(oid, 0)
or
((self._savepoint_storage is not None)
and
self._savepoint_storage.creating.get(oid, 0)
)
)
def sync(self):
"""Manually update the view on the database."""
self.transaction_manager.abort()
self._storage_sync()
def getDebugInfo(self):
"""Returns a tuple with different items for debugging the
connection.
"""
return self._debug_info
def setDebugInfo(self, *args):
"""Add the given items to the debug information of this connection."""
self._debug_info = self._debug_info + args
def getTransferCounts(self, clear=False):
"""Returns the number of objects loaded and stored."""
res = self._load_count, self._store_count
if clear:
self._load_count = 0
self._store_count = 0
return res
# Connection methods
##########################################################################
##########################################################################
# Data manager (ISavepointDataManager) methods
def abort(self, transaction):
"""Abort a transaction and forget all changes."""
# The order is important here. We want to abort registered
# objects before we process the cache. Otherwise, we may un-add
# objects added in savepoints. If they've been modified since
# the savepoint, then they won't have _p_oid or _p_jar after
# they've been unadded. This will make the code in _abort
# confused.
self._abort()
if self._savepoint_storage is not None:
self._abort_savepoint()
self._tpc_cleanup()
def _abort(self):
"""Abort a transaction and forget all changes."""
for obj in self._registered_objects:
oid = obj._p_oid
assert oid is not None
if oid in self._added:
del self._added[oid]
if self._cache.get(oid) is not None:
del self._cache[oid]
del obj._p_jar
del obj._p_oid
if obj._p_changed:
obj._p_changed = False
else:
# Note: If we invalidate a non-ghostifiable object
# (i.e. a persistent class), the object will
# immediately reread its state. That means that the
# following call could result in a call to
# self.setstate, which, of course, must succeed.
# In general, it would be better if the read could be
# delayed until the start of the next transaction. If
# we read at the end of a transaction and if the
# object was invalidated during this transaction, then
# we'll read non-current data, which we'll discard
# later in transaction finalization. Unfortnately, we
# can only delay the read if this abort corresponds to
# a top-level-transaction abort. We can't tell if
# this is a top-level-transaction abort, so we have to
# go ahead and invalidate now. Fortunately, it's
# pretty unlikely that the object we are invalidating
# was invalidated by another thread, so the risk of a
# reread is pretty low.
self._cache.invalidate(oid)
def _tpc_cleanup(self):
"""Performs cleanup operations to support tpc_finish and tpc_abort."""
self._conflicts.clear()
self._needs_to_join = True
self._registered_objects = []
self._creating.clear()
# Process pending invalidations.
def _flush_invalidations(self):
if self._mvcc_storage:
# Poll the storage for invalidations.
invalidated = self._storage.poll_invalidations()
if invalidated is None:
# special value: the transaction is so old that
# we need to flush the whole cache.
self._cache.invalidate(self._cache.cache_data.keys())
elif invalidated:
self._cache.invalidate(invalidated)
self._inv_lock.acquire()
try:
# Non-ghostifiable objects may need to read when they are
# invalidated, so we'll quickly just replace the
# invalidating dict with a new one. We'll then process
# the invalidations after freeing the lock *and* after
# resetting the time. This means that invalidations will
# happen after the start of the transactions. They are
# subject to conflict errors and to reading old data.
# TODO: There is a potential problem lurking for persistent
# classes. Suppose we have an invalidation of a persistent
# class and of an instance. If the instance is
# invalidated first and if the invalidation logic uses
# data read from the class, then the invalidation could
# be performed with stale data. Or, suppose that there
# are instances of the class that are freed as a result of
# invalidating some object. Perhaps code in their __del__
# uses class data. Really, the only way to properly fix
# this is to, in fact, make classes ghostifiable. Then
# we'd have to reimplement attribute lookup to check the
# class state and, if necessary, activate the class. It's
# much worse than that though, because we'd also need to
# deal with slots. When a class is ghostified, we'd need
# to replace all of the slot operations with versions that
# reloaded the object when called. It's hard to say which
# is better or worse. For now, it seems the risk of
# using a class while objects are being invalidated seems
# small enough to be acceptable.
invalidated = dict.fromkeys(self._invalidated)
self._invalidated = set()
self._txn_time = None
if self._invalidatedCache:
self._invalidatedCache = False
invalidated = self._cache.cache_data.copy()
finally:
self._inv_lock.release()
self._cache.invalidate(invalidated)
# Now is a good time to collect some garbage.
self._cache.incrgc()
def tpc_begin(self, transaction):
"""Begin commit of a transaction, starting the two-phase commit."""
self._modified = []
# _creating is a list of oids of new objects, which is used to
# remove them from the cache if a transaction aborts.
self._creating.clear()
self._normal_storage.tpc_begin(transaction)
def commit(self, transaction):
"""Commit changes to an object"""
if self._savepoint_storage is not None:
# We first checkpoint the current changes to the savepoint
self.savepoint()
# then commit all of the savepoint changes at once
self._commit_savepoint(transaction)
# No need to call _commit since savepoint did.
else:
self._commit(transaction)
def _commit(self, transaction):
"""Commit changes to an object"""
if self.before is not None:
raise ReadOnlyHistoryError()
if self._import:
# We are importing an export file. We alsways do this
# while making a savepoint so we can copy export data
# directly to our storage, typically a TmpStore.
self._importDuringCommit(transaction, *self._import)
self._import = None
# Just in case an object is added as a side-effect of storing
# a modified object. If, for example, a __getstate__() method
# calls add(), the newly added objects will show up in
# _added_during_commit. This sounds insane, but has actually
# happened.
self._added_during_commit = []
if self._invalidatedCache:
raise ConflictError()
for obj in self._registered_objects:
oid = obj._p_oid
assert oid
if oid in self._conflicts:
raise ReadConflictError(object=obj)
if obj._p_jar is not self:
raise InvalidObjectReference(obj, obj._p_jar)
elif oid in self._added:
assert obj._p_serial == z64
elif obj._p_changed:
if oid in self._invalidated:
resolve = getattr(obj, "_p_resolveConflict", None)
if resolve is None:
raise ConflictError(object=obj)
self._modified.append(oid)
else:
# Nothing to do. It's been said that it's legal, e.g., for
# an object to set _p_changed to false after it's been
# changed and registered.
continue
self._store_objects(ObjectWriter(obj), transaction)
for obj in self._added_during_commit:
self._store_objects(ObjectWriter(obj), transaction)
self._added_during_commit = None
def _store_objects(self, writer, transaction):
for obj in writer:
oid = obj._p_oid
serial = getattr(obj, "_p_serial", z64)
if ((serial == z64)
and
((self._savepoint_storage is None)
or (oid not in self._savepoint_storage.creating)
or self._savepoint_storage.creating[oid]
)
):
# obj is a new object
# Because obj was added, it is now in _creating, so it
# can be removed from _added. If oid wasn't in
# adding, then we are adding it implicitly.
implicitly_adding = self._added.pop(oid, None) is None
self._creating[oid] = implicitly_adding
else:
if (oid in self._invalidated
and not hasattr(obj, '_p_resolveConflict')):
raise ConflictError(object=obj)
self._modified.append(oid)
p = writer.serialize(obj) # This calls __getstate__ of obj
if isinstance(obj, Blob):
if not IBlobStorage.providedBy(self._storage):
raise Unsupported(
"Storing Blobs in %s is not supported." %
repr(self._storage))
if obj.opened():
raise ValueError("Can't commit with opened blobs.")
s = self._storage.storeBlob(oid, serial, p,
obj._uncommitted(),
'', transaction)
# we invalidate the object here in order to ensure
# that that the next attribute access of its name
# unghostify it, which will cause its blob data
# to be reattached "cleanly"
obj._p_invalidate()
else:
s = self._storage.store(oid, serial, p, '', transaction)
self._store_count += 1
# Put the object in the cache before handling the
# response, just in case the response contains the
# serial number for a newly created object
try:
self._cache[oid] = obj
except:
# Dang, I bet it's wrapped:
# TODO: Deprecate, then remove, this.
if hasattr(obj, 'aq_base'):
self._cache[oid] = obj.aq_base
else:
raise
self._cache.update_object_size_estimation(oid, len(p))
obj._p_estimated_size = len(p)
self._handle_serial(s, oid)
def _handle_serial(self, store_return, oid=None, change=1):
"""Handle the returns from store() and tpc_vote() calls."""
# These calls can return different types depending on whether
# ZEO is used. ZEO uses asynchronous returns that may be
# returned in batches by the ClientStorage. ZEO1 can also
# return an exception object and expect that the Connection
# will raise the exception.
# When conflict resolution occurs, the object state held by
# the connection does not match what is written to the
# database. Invalidate the object here to guarantee that
# the new state is read the next time the object is used.
if not store_return:
return
if isinstance(store_return, str):
assert oid is not None
self._handle_one_serial(oid, store_return, change)
else:
for oid, serial in store_return:
self._handle_one_serial(oid, serial, change)
def _handle_one_serial(self, oid, serial, change):
if not isinstance(serial, str):
raise serial
obj = self._cache.get(oid, None)
if obj is None:
return
if serial == ResolvedSerial:
del obj._p_changed # transition from changed to ghost
else:
if change:
obj._p_changed = 0 # transition from changed to up-to-date
obj._p_serial = serial
def tpc_abort(self, transaction):
if self._import:
self._import = None
if self._savepoint_storage is not None:
self._abort_savepoint()
self._storage.tpc_abort(transaction)
# Note: If we invalidate a non-ghostifiable object (i.e. a
# persistent class), the object will immediately reread its
# state. That means that the following call could result in a
# call to self.setstate, which, of course, must succeed. In
# general, it would be better if the read could be delayed
# until the start of the next transaction. If we read at the
# end of a transaction and if the object was invalidated
# during this transaction, then we'll read non-current data,
# which we'll discard later in transaction finalization. We
# could, theoretically queue this invalidation by calling
# self.invalidate. Unfortunately, attempts to make that
# change resulted in mysterious test failures. It's pretty
# unlikely that the object we are invalidating was invalidated
# by another thread, so the risk of a reread is pretty low.
# It's really not worth the effort to pursue this.
self._cache.invalidate(self._modified)
self._invalidate_creating()
while self._added:
oid, obj = self._added.popitem()
if obj._p_changed:
obj._p_changed = False
del obj._p_oid
del obj._p_jar
self._tpc_cleanup()
def _invalidate_creating(self, creating=None):
"""Disown any objects newly saved in an uncommitted transaction."""
if creating is None:
creating = self._creating
for oid in creating:
o = self._cache.get(oid)
if o is not None:
del self._cache[oid]
if o._p_changed:
o._p_changed = False
del o._p_jar
del o._p_oid
creating.clear()
def tpc_vote(self, transaction):
"""Verify that a data manager can commit the transaction."""
try:
vote = self._storage.tpc_vote
except AttributeError:
return
s = vote(transaction)
self._handle_serial(s)
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
def callback(tid):
if self._mvcc_storage:
# Inter-connection invalidation is not needed when the
# storage provides MVCC.
return
d = dict.fromkeys(self._modified)
self._db.invalidate(tid, d, self)
# It's important that the storage calls the passed function
# while it still has its lock. We don't want another thread
# to be able to read any updated data until we've had a chance
# to send an invalidation message to all of the other
# connections!
self._storage.tpc_finish(transaction, callback)
self._tpc_cleanup()
def sortKey(self):
"""Return a consistent sort key for this connection."""
return "%s:%s" % (self._storage.sortKey(), id(self))
# Data manager (ISavepointDataManager) methods
##########################################################################
##########################################################################
# Transaction-manager synchronization -- ISynchronizer
def beforeCompletion(self, txn):
# We don't do anything before a commit starts.
pass
# Call the underlying storage's sync() method (if any), and process
# pending invalidations regardless. Of course this should only be
# called at transaction boundaries.
def _storage_sync(self, *ignored):
sync = getattr(self._storage, 'sync', 0)
if sync:
sync()
self._flush_invalidations()
afterCompletion = _storage_sync
newTransaction = _storage_sync
# Transaction-manager synchronization -- ISynchronizer
##########################################################################
##########################################################################
# persistent.interfaces.IPersistentDatamanager
def oldstate(self, obj, tid):
"""Return copy of 'obj' that was written by transaction 'tid'."""
assert obj._p_jar is self
p = self._storage.loadSerial(obj._p_oid, tid)
return self._reader.getState(p)
def setstate(self, obj):
"""Turns the ghost 'obj' into a real object by loading its state from
the database."""
oid = obj._p_oid
if self.opened is None:
msg = ("Shouldn't load state for %s "
"when the connection is closed" % oid_repr(oid))
self._log.error(msg)
raise ConnectionStateError(msg)
try:
self._setstate(obj)
except ConflictError:
raise
except:
self._log.error("Couldn't load state for %s", oid_repr(oid),
exc_info=sys.exc_info())
raise
def _setstate(self, obj):
# Helper for setstate(), which provides logging of failures.
# The control flow is complicated here to avoid loading an
# object revision that we are sure we aren't going to use. As
# a result, invalidation tests occur before and after the
# load. We can only be sure about invalidations after the
# load.
# If an object has been invalidated, there are several cases
# to consider:
# 1. Check _p_independent()
# 2. Try MVCC
# 3. Raise ConflictError.
# Does anything actually use _p_independent()? It would simplify
# the code if we could drop support for it.
# (BTrees.Length does.)
if self.before is not None:
# Load data that was current before the time we have.
before = self.before
t = self._storage.loadBefore(obj._p_oid, before)
if t is None:
raise POSKeyError() # historical connection!
p, serial, end = t
else:
# There is a harmless data race with self._invalidated. A
# dict update could go on in another thread, but we don't care
# because we have to check again after the load anyway.
if self._invalidatedCache:
raise ReadConflictError()
if (obj._p_oid in self._invalidated and
not myhasattr(obj, "_p_independent")):
# If the object has _p_independent(), we will handle it below.
self._load_before_or_conflict(obj)
return
p, serial = self._storage.load(obj._p_oid, '')
self._load_count += 1
self._inv_lock.acquire()
try:
invalid = obj._p_oid in self._invalidated
finally:
self._inv_lock.release()
if invalid:
if myhasattr(obj, "_p_independent"):
# This call will raise a ReadConflictError if something
# goes wrong
self._handle_independent(obj)
else:
self._load_before_or_conflict(obj)
return
self._reader.setGhostState(obj, p)
obj._p_serial = serial
self._cache.update_object_size_estimation(obj._p_oid, len(p))
obj._p_estimated_size = len(p)
# Blob support
if isinstance(obj, Blob):
obj._p_blob_uncommitted = None
obj._p_blob_committed = self._storage.loadBlob(obj._p_oid, serial)
def _load_before_or_conflict(self, obj):
"""Load non-current state for obj or raise ReadConflictError."""
if not self._setstate_noncurrent(obj):
self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
def _setstate_noncurrent(self, obj):
"""Set state using non-current data.
Return True if state was available, False if not.
"""
try:
# Load data that was current before the commit at txn_time.
t = self._storage.loadBefore(obj._p_oid, self._txn_time)
except KeyError:
return False
if t is None:
return False
data, start, end = t
# The non-current transaction must have been written before
# txn_time. It must be current at txn_time, but could have
# been modified at txn_time.
assert start < self._txn_time, (u64(start), u64(self._txn_time))
assert end is not None
assert self._txn_time <= end, (u64(self._txn_time), u64(end))
self._reader.setGhostState(obj, data)
obj._p_serial = start
# MVCC Blob support
if isinstance(obj, Blob):
obj._p_blob_uncommitted = None
obj._p_blob_committed = self._storage.loadBlob(obj._p_oid, start)
return True
def _handle_independent(self, obj):
# Helper method for setstate() handles possibly independent objects
# Call _p_independent(), if it returns True, setstate() wins.
# Otherwise, raise a ConflictError.
if obj._p_independent():
self._inv_lock.acquire()
try:
try:
self._invalidated.remove(obj._p_oid)
except KeyError:
pass
finally:
self._inv_lock.release()
else:
self._conflicts[obj._p_oid] = 1
self._register(obj)
raise ReadConflictError(object=obj)
def register(self, obj):
"""Register obj with the current transaction manager.
A subclass could override this method to customize the default
policy of one transaction manager for each thread.
obj must be an object loaded from this Connection.
"""
assert obj._p_jar is self
if obj._p_oid is None:
# The actual complaint here is that an object without
# an oid is being registered. I can't think of any way to
# achieve that without assignment to _p_jar. If there is
# a way, this will be a very confusing exception.
raise ValueError("assigning to _p_jar is not supported")
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
self._register(obj)
def _register(self, obj=None):
# The order here is important. We need to join before
# registering the object, because joining may take a
# savepoint, and the savepoint should not reflect the change
# to the object.
if self._needs_to_join:
self.transaction_manager.get().join(self)
self._needs_to_join = False
if obj is not None:
self._registered_objects.append(obj)
# persistent.interfaces.IPersistentDatamanager
##########################################################################
##########################################################################
# PROTECTED stuff (used by e.g. ZODB.DB.DB)
def _cache_items(self):
# find all items on the lru list
items = self._cache.lru_items()
# fine everything. some on the lru list, some not
everything = self._cache.cache_data
# remove those items that are on the lru list
for k,v in items:
del everything[k]
# return a list of [ghosts....not recently used.....recently used]
return everything.items() + items
def open(self, transaction_manager=None, delegate=True):
"""Register odb, the DB that this Connection uses.
This method is called by the DB every time a Connection
is opened. Any invalidations received while the Connection
was closed will be processed.
If the global module function resetCaches() was called, the
cache will be cleared.
Parameters:
odb: database that owns the Connection
transaction_manager: transaction manager to use. None means
use the default transaction manager.
register for afterCompletion() calls.
"""
self.opened = time.time()
if transaction_manager is None:
transaction_manager = transaction.manager
self.transaction_manager = transaction_manager
if self._reset_counter != global_reset_counter:
# New code is in place. Start a new cache.
self._resetCache()
else:
self._flush_invalidations()
transaction_manager.registerSynch(self)
if self._cache is not None:
self._cache.incrgc() # This is a good time to do some GC
if delegate:
# delegate open to secondary connections
for connection in self.connections.values():
if connection is not self:
connection.open(transaction_manager, False)
def _resetCache(self):
"""Creates a new cache, discarding the old one.
See the docstring for the resetCaches() function.
"""
self._reset_counter = global_reset_counter
self._invalidated.clear()
self._invalidatedCache = False
cache_size = self._cache.cache_size
cache_size_bytes = self._cache.cache_size_bytes
self._cache = cache = PickleCache(self, cache_size, cache_size_bytes)
if getattr(self, '_reader', None) is not None:
self._reader._cache = cache
def _releaseStorage(self):
"""Tell the storage to release resources it's using"""
if self._mvcc_storage:
self._storage.release()
##########################################################################
# Python protocol
def __repr__(self):
return '<Connection at %08x>' % (positive_id(self),)
# Python protocol
##########################################################################
##########################################################################
# DEPRECATION candidates
__getitem__ = get
def exchange(self, old, new):
# called by a ZClasses method that isn't executed by the test suite
oid = old._p_oid
new._p_oid = oid
new._p_jar = self
new._p_changed = 1
self._register(new)
self._cache[oid] = new
# DEPRECATION candidates
##########################################################################
##########################################################################
# DEPRECATED methods
# None at present.
# DEPRECATED methods
##########################################################################
#####################################################################
# Savepoint support
def savepoint(self):
if self._savepoint_storage is None:
tmpstore = TmpStore(self._normal_storage)
self._savepoint_storage = tmpstore
self._storage = self._savepoint_storage
self._creating.clear()
self._commit(None)
self._storage.creating.update(self._creating)
self._creating.clear()
self._registered_objects = []
state = (self._storage.position,
self._storage.index.copy(),
self._storage.creating.copy(),
)
result = Savepoint(self, state)
# While the interface doesn't guarantee this, savepoints are
# sometimes used just to "break up" very long transactions, and as
# a pragmatic matter this is a good time to reduce the cache
# memory burden.
self.cacheGC()
return result
def _rollback(self, state):
self._abort()
self._registered_objects = []
src = self._storage
self._invalidate_creating(src.creating)
index = src.index
src.reset(*state)
self._cache.invalidate(index)
def _commit_savepoint(self, transaction):
"""Commit all changes made in savepoints and begin 2-phase commit
"""
src = self._savepoint_storage
self._storage = self._normal_storage
self._savepoint_storage = None
try:
self._log.debug("Committing savepoints of size %s", src.getSize())
oids = src.index.keys()
# Copy invalidating and creating info from temporary storage:
self._modified.extend(oids)
self._creating.update(src.creating)
for oid in oids:
data, serial = src.load(oid, src)
obj = self._cache.get(oid, None)
if obj is not None:
self._cache.update_object_size_estimation(
obj._p_oid, len(data))
obj._p_estimated_size = len(data)
if isinstance(self._reader.getGhost(data), Blob):
blobfilename = src.loadBlob(oid, serial)
s = self._storage.storeBlob(
oid, serial, data, blobfilename,
'', transaction)
# we invalidate the object here in order to ensure
# that that the next attribute access of its name
# unghostify it, which will cause its blob data
# to be reattached "cleanly"
self.invalidate(None, (oid, ))
else:
s = self._storage.store(oid, serial, data,
'', transaction)
self._handle_serial(s, oid, change=False)
finally:
src.close()
def _abort_savepoint(self):
"""Discard all savepoint data."""
src = self._savepoint_storage
self._invalidate_creating(src.creating)
self._storage = self._normal_storage
self._savepoint_storage = None
# Note: If we invalidate a non-ghostifiable object (i.e. a
# persistent class), the object will immediately reread it's
# state. That means that the following call could result in a
# call to self.setstate, which, of course, must succeed. In
# general, it would be better if the read could be delayed
# until the start of the next transaction. If we read at the
# end of a transaction and if the object was invalidated
# during this transaction, then we'll read non-current data,
# which we'll discard later in transaction finalization. We
# could, theoretically queue this invalidation by calling
# self.invalidate. Unfortunately, attempts to make that
# change resulted in mysterious test failures. It's pretty
# unlikely that the object we are invalidating was invalidated
# by another thread, so the risk of a reread is pretty low.
# It's really not worth the effort to pursue this.
# Note that we do this *after* reseting the storage so that, if
# data are read, we read it from the reset storage!
self._cache.invalidate(src.index)
src.close()
# Savepoint support
#####################################################################
class Savepoint:
implements(IDataManagerSavepoint)
def __init__(self, datamanager, state):
self.datamanager = datamanager
self.state = state
def rollback(self):
self.datamanager._rollback(self.state)
class TmpStore:
"""A storage-like thing to support savepoints."""
implements(IBlobStorage)
def __init__(self, storage):
self._storage = storage
for method in (
'getName', 'new_oid', 'getSize', 'sortKey', 'loadBefore',
'isReadOnly'
):
setattr(self, method, getattr(storage, method))
self._file = tempfile.TemporaryFile()
# position: current file position
# _tpos: file position at last commit point
self.position = 0L
# index: map oid to pos of last committed version
self.index = {}
self.creating = {}
self._blob_dir = None
def __len__(self):
return len(self.index)
def close(self):
self._file.close()
if self._blob_dir is not None:
remove_committed_dir(self._blob_dir)
self._blob_dir = None
def load(self, oid, version):
pos = self.index.get(oid)
if pos is None:
return self._storage.load(oid, '')
self._file.seek(pos)
h = self._file.read(8)
oidlen = u64(h)
read_oid = self._file.read(oidlen)
if read_oid != oid:
raise POSException.StorageSystemError('Bad temporary storage')
h = self._file.read(16)
size = u64(h[8:])
serial = h[:8]
return self._file.read(size), serial
def store(self, oid, serial, data, version, transaction):
# we have this funny signature so we can reuse the normal non-commit
# commit logic
assert version == ''
self._file.seek(self.position)
l = len(data)
if serial is None:
serial = z64
header = p64(len(oid)) + oid + serial + p64(l)
self._file.write(header)
self._file.write(data)
self.index[oid] = self.position
self.position += l + len(header)
return serial
def storeBlob(self, oid, serial, data, blobfilename, version,
transaction):
assert version == ''
serial = self.store(oid, serial, data, '', transaction)
targetpath = self._getBlobPath()
if not os.path.exists(targetpath):
os.makedirs(targetpath, 0700)
targetname = self._getCleanFilename(oid, serial)
rename_or_copy_blob(blobfilename, targetname, chmod=False)
def loadBlob(self, oid, serial):
"""Return the filename where the blob file can be found.
"""
if not IBlobStorage.providedBy(self._storage):
raise Unsupported(
"Blobs are not supported by the underlying storage %r." %
self._storage)
filename = self._getCleanFilename(oid, serial)
if not os.path.exists(filename):
return self._storage.loadBlob(oid, serial)
return filename
def openCommittedBlobFile(self, oid, serial, blob=None):
blob_filename = self.loadBlob(oid, serial)
if blob is None:
return open(blob_filename, 'rb')
else:
return ZODB.blob.BlobFile(blob_filename, 'r', blob)
def _getBlobPath(self):
blob_dir = self._blob_dir
if blob_dir is None:
blob_dir = tempfile.mkdtemp(dir=self.temporaryDirectory(),
prefix='savepoints')
self._blob_dir = blob_dir
return blob_dir
def _getCleanFilename(self, oid, tid):
return os.path.join(
self._getBlobPath(),
"%s-%s%s" % (utils.oid_repr(oid), utils.tid_repr(tid),
SAVEPOINT_SUFFIX,)
)
def temporaryDirectory(self):
return self._storage.temporaryDirectory()
def reset(self, position, index, creating):
self._file.truncate(position)
self.position = position
# Caution: We're typically called as part of a savepoint rollback.
# Other machinery remembers the index to restore, and passes it to
# us. If we simply bind self.index to `index`, then if the caller
# didn't pass a copy of the index, the caller's index will mutate
# when self.index mutates. This can be a disaster if the caller is a
# savepoint to which the user rolls back again later (the savepoint
# loses the original index it passed). Therefore, to be safe, we make
# a copy of the index here. An alternative would be to ensure that
# all callers pass copies. As is, our callers do not make copies.
self.index = index.copy()
self.creating = creating
class RootConvenience(object):
def __init__(self, root):
self.__dict__['_root'] = root
def __getattr__(self, name):
try:
return self._root[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, v):
self._root[name] = v
def __delattr__(self, name):
try:
del self._root[name]
except KeyError:
raise AttributeError(name)
def __call__(self):
return self._root
def __repr__(self):
names = " ".join(sorted(self._root))
if len(names) > 60:
names = names[:57].rsplit(' ', 1)[0] + ' ...'
return "<root: %s>" % names
|